Skip to content

Commit

Permalink
Correct the delete by query endpoint to match the OpenSearch API
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Farr <tsfarr@amazon.com>
  • Loading branch information
Xtansia committed Nov 1, 2023
1 parent 933194d commit 8b41ab5
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 28 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/build_hive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-hadoop-hive:build
run: ./gradlew opensearch-hadoop-hive:build

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_mr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-hadoop-mr:build
run: ./gradlew opensearch-hadoop-mr:build

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark:build
run: ./gradlew opensearch-spark:build

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_20.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-20:integrationTest
run: ./gradlew opensearch-spark-20:integrationTest

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_20_scala_210.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_20_scala_211.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210
run: ./gradlew opensearch-spark-20:integrationTestSpark20scala210

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_30.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-30:integrationTest
run: ./gradlew opensearch-spark-30:integrationTest

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
9 changes: 8 additions & 1 deletion .github/workflows/build_spark_30_scala_213.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ jobs:
uses: gradle/gradle-build-action@v2

- name: Build with Gradle
run: ./gradlew opensearch-spark-30:integrationTestSpark30scala213
run: ./gradlew opensearch-spark-30:integrationTestSpark30scala213

- name: Publish Test Results
if: failure()
uses: actions/upload-artifact@v2
with:
name: test-results-${{ matrix.os }}
path: '**/build/test-results'
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Removed

### Fixed
- Corrected the delete by query endpoint to match the OpenSearch API ([#350](https://github.com/opensearch-project/opensearch-hadoop/pull/350))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class AbstractRestQueryTest {
@Before
public void start() throws IOException {
version = TestUtils.getOpenSearchClusterInfo().getMajorVersion();
settings = new TestSettings("rest/savebulk");
settings = new TestSettings("rest_save_bulk");
settings.setInternalVersion(version);
//testSettings.setPort(9200)
settings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Ignore;
import org.opensearch.hadoop.OpenSearchHadoopIllegalArgumentException;
import org.opensearch.hadoop.cfg.ConfigurationOptions;
import org.opensearch.hadoop.cfg.Settings;
Expand Down Expand Up @@ -66,9 +67,11 @@ public class AbstractRestSaveTest {

private static final Log LOG = LogFactory.getLog(AbstractRestSaveTest.class);

private static final JsonUtils.Query HITS_TOTAL_VALUE = JsonUtils.query("hits").get("total").get("value");

@Test
public void testBulkWrite() throws Exception {
TestSettings testSettings = new TestSettings("rest/savebulk");
TestSettings testSettings = new TestSettings("rest_save_bulk");
//testSettings.setPort(9200)
testSettings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
RestRepository client = new RestRepository(testSettings);
Expand All @@ -83,16 +86,18 @@ public void testBulkWrite() throws Exception {
line.put("name", in.next());
line.put("url", in.next());
line.put("picture", in.next());
in.nextLine();
client.writeToIndex(line);
line.clear();
}

client.close();
}

@Ignore("OpenSearch throws an error on empty bulk request")
@Test
public void testEmptyBulkWrite() throws Exception {
TestSettings testSettings = new TestSettings("rest/emptybulk");
TestSettings testSettings = new TestSettings("rest_empty_bulk");
testSettings.setInternalClusterInfo(TestUtils.getOpenSearchClusterInfo());
testSettings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
RestRepository restRepo = new RestRepository(testSettings);
Expand All @@ -105,8 +110,9 @@ public void testEmptyBulkWrite() throws Exception {

@Test
public void testRepositoryDelete() throws Exception {
Settings settings = new TestSettings("rest/deletebulk");
RestUtils.delete("rest");
String index = "rest_delete_bulk";
Settings settings = new TestSettings(index);
RestUtils.delete(index);
InitializationUtils.discoverClusterInfo(settings, LOG);
settings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
settings.setProperty(ConfigurationOptions.OPENSEARCH_MAPPING_DEFAULT_EXTRACTOR_CLASS, ConstantFieldExtractor.class.getName());
Expand All @@ -120,18 +126,18 @@ public void testRepositoryDelete() throws Exception {
String doc = "{\"index\":{\"_id\":\"" + StringUtils.jsonEncoding(id) + "\"}}\n{\"field\":1}\n";
repository.writeProcessedToIndex(new BytesArray(doc));
repository.flush();
RestUtils.refresh("rest");
RestUtils.refresh(index);

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("rest/deletebulk/_search"))), is(equalTo(1)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get(index + "/_search"))), is(equalTo(1)));

repository.delete();

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("rest/deletebulk/_search"))), is(equalTo(0)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get(index + "/_search"))), is(equalTo(0)));
}

@Test
public void testRepositoryDeleteEmptyIndex() throws Exception {
Settings settings = new TestSettings("delete_empty/test");
Settings settings = new TestSettings("delete_empty");
RestUtils.delete("delete_empty");
InitializationUtils.discoverClusterInfo(settings, LOG);
settings.setProperty(ConfigurationOptions.OPENSEARCH_SERIALIZATION_WRITER_VALUE_CLASS, JdkValueWriter.class.getName());
Expand All @@ -143,11 +149,11 @@ public void testRepositoryDeleteEmptyIndex() throws Exception {
RestRepository repository = new RestRepository(settings);
repository.touch();

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("delete_empty/test/_search"))), is(equalTo(0)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get("delete_empty/_search"))), is(equalTo(0)));

repository.delete();

assertThat(JsonUtils.query("hits").get("total").apply(JsonUtils.asMap(RestUtils.get("delete_empty/test/_search"))), is(equalTo(0)));
assertThat(HITS_TOTAL_VALUE.apply(JsonUtils.asMap(RestUtils.get("delete_empty/_search"))), is(equalTo(0)));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

@Ignore
@RunWith(Suite.class)
@Suite.SuiteClasses({ AbstractRestSaveTest.class, AbstractRestQueryTest.class })
public class RestSuite {
Expand Down
9 changes: 8 additions & 1 deletion mr/src/main/java/org/opensearch/hadoop/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public int getResponseCode() {
public BulkActionResponse bulk(Resource resource, TrackingBytesArray data) {
// NB: dynamically get the stats since the transport can change
long start = network.transportStats().netTotalTime;
Response response = execute(PUT, resource.bulk(), data);
Response response = execute(POST, resource.bulk(), data);
long spent = network.transportStats().netTotalTime - start;

stats.bulkTotal++;
Expand Down Expand Up @@ -499,6 +499,13 @@ public boolean delete(String indexOrType) {
return (res.status() == HttpStatus.OK ? true : false);
}

public int deleteByQuery(String indexOrType, QueryBuilder query) {
BytesArray body = searchRequest(query);
Request req = new SimpleRequest(POST, null, indexOrType + "/_delete_by_query", body);
Response res = executeNotFoundAllowed(req);
return parseContent(res.body(), "deleted");
}

public boolean deleteScroll(String scrollId) {
BytesArray body = new BytesArray(("{\"scroll_id\":[\"" + scrollId + "\"]}").getBytes(StringUtils.UTF_8));
Request req = new SimpleRequest(DELETE, null, "_search/scroll", body);
Expand Down
16 changes: 9 additions & 7 deletions mr/src/main/java/org/opensearch/hadoop/rest/RestRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.hadoop.cfg.Settings;
import org.opensearch.hadoop.rest.bulk.BulkProcessor;
import org.opensearch.hadoop.rest.bulk.BulkResponse;
import org.opensearch.hadoop.rest.query.MatchAllQueryBuilder;
import org.opensearch.hadoop.rest.query.QueryUtils;
import org.opensearch.hadoop.rest.stats.Stats;
import org.opensearch.hadoop.rest.stats.StatsAware;
Expand Down Expand Up @@ -376,15 +377,16 @@ public boolean touch() {
}

public void delete() {
// try first a blind delete by query (since the plugin might be installed)
// try first a blind delete by query
try {
if (resources.getResourceWrite().isTyped()) {
client.delete(resources.getResourceWrite().index() + "/" + resources.getResourceWrite().type() + "/_query?q=*");
} else {
client.delete(resources.getResourceWrite().index() + "/_query?q=*");
}
Resource res = resources.getResourceWrite();
client.deleteByQuery(
res.isTyped()
? res.index() + "/" + res.type()
: res.index(),
MatchAllQueryBuilder.MATCH_ALL);
} catch (OpenSearchHadoopInvalidRequest ehir) {
log.info("Skipping delete by query as the plugin is not installed...");
log.error("Delete by query was not successful...", ehir);
}

// in ES 2.0 and higher this means scrolling and deleting the docs by hand...
Expand Down

0 comments on commit 8b41ab5

Please sign in to comment.