diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java
index 047e4c4a07..8b4b2a729f 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java
@@ -373,19 +373,9 @@ private String trimKey(final String key) {
}
private boolean isValidKey(final String key) {
- char previous = ' ';
- char next = ' ';
for (int i = 0; i < key.length(); i++) {
char c = key.charAt(i);
- if (i < key.length() - 1) {
- next = key.charAt(i + 1);
- }
-
- if ((i == 0 || i == key.length() - 1 || previous == '/' || next == '/') && (c == '_' || c == '.' || c == '-')) {
- return false;
- }
-
if (!(c >= 48 && c <= 57
|| c >= 65 && c <= 90
|| c >= 97 && c <= 122
@@ -397,7 +387,6 @@ private boolean isValidKey(final String key) {
return false;
}
- previous = c;
}
return true;
}
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java
index 9de73495f9..92b181ac8c 100644
--- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java
@@ -323,9 +323,8 @@ public void testIsValueAList_withNull() {
}
@ParameterizedTest
- @ValueSource(strings = {"", "withSpecialChars*$%", "-withPrefixDash", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
- "withDashSuffix-", "withDashSuffix-/nestedKey", "withDashPrefix/-nestedKey", "_withUnderscorePrefix", "withUnderscoreSuffix_",
- ".withDotPrefix", "withDotSuffix.", "with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
+ @ValueSource(strings = {"", "withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
+ "with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
void testKey_withInvalidKey_throwsIllegalArgumentException(final String invalidKey) {
assertThrowsForKeyCheck(IllegalArgumentException.class, invalidKey);
}
diff --git a/data-prepper-plugins/opensearch-source/README.md b/data-prepper-plugins/opensearch-source/README.md
new file mode 100644
index 0000000000..a86c2e77ec
--- /dev/null
+++ b/data-prepper-plugins/opensearch-source/README.md
@@ -0,0 +1,69 @@
+# OpenSearch Source
+
+This is the Date Prepper OpenSearch source plugin that processes indices either OpenSearch, Elasticsearch,
+or Amazon OpenSearch Service. It is meant for migrating index data from a cluster.
+
+Note: Only fully tested versions with be listed below. It is likely many more versions are supported already, but it is untested.
+
+The OpenSearch source is compatible with the following OpenSearch versions:
+* 2.5
+
+And is compatible with the following Elasticsearch versions:
+* 7.10
+
+# Usages
+
+### Amazon OpenSearch Service
+
+The OpenSearch sink can also be configured for an Amazon OpenSearch Service domain. See [security](security.md) for details.
+
+```yaml
+opensearch-source-pipeline:
+ source:
+ opensearch:
+ connection:
+ insecure: true
+ hosts: [ "https://search-my-domain-soopywaovobopgs8ywurr3utsu.us-east-1.es.amazonaws.com" ]
+ aws:
+ region: "us-east-1"
+ sts_role_arn: "arn:aws:iam::123456789012:role/my-domain-role"
+```
+
+## Configuration
+
+- `hosts`: A list of IP addresses of OpenSearch or Elasticsearch nodes.
+
+
+- `username`:
+
+
+- `password`:
+
+
+- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used.
+
+
+- `search_options` (Optional) : See [Search Configuration](#search_configuration) for details
+
+
+- `indices` (Optional): See [Indices Configurations](#indices_configuration) for filtering options.
+
+
+- `scheduling` (Optional): See [Scheduling Configuration](#scheduling_configuration) for details
+
+
+- `connection` (Optional): See []
+
+### AWS Configuration
+
+* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
+* `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
+* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the source plugin.
+
+### Search Configuration
+
+### Scheduling Configuration
+
+### Connection Configuration
+
+### Indices Configuration
\ No newline at end of file
diff --git a/data-prepper-plugins/opensearch-source/security.md b/data-prepper-plugins/opensearch-source/security.md
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java
index 99f04d3e33..e076b7de10 100644
--- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java
+++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java
@@ -64,34 +64,37 @@ public Collection> doExecute(final Collection> recor
final boolean doUsePointer = Objects.nonNull(pointer);
for (final Record record : records) {
- final Event event = record.getData();
- if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) {
- continue;
- }
-
- final String message = event.get(source, String.class);
- if (Objects.isNull(message)) {
- continue;
- }
-
- try {
- final TypeReference> hashMapTypeReference = new TypeReference>() {};
- Map parsedJson = objectMapper.readValue(message, hashMapTypeReference);
-
- if (doUsePointer) {
- parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot);
- }
-
- if (doWriteToRoot) {
- writeToRoot(event, parsedJson);
- } else {
- event.put(destination, parsedJson);
+ final Event event = record.getData();
+ try {
+ if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) {
+ continue;
+ }
+
+ final String message = event.get(source, String.class);
+ if (Objects.isNull(message)) {
+ continue;
+ }
+ final TypeReference> hashMapTypeReference = new TypeReference>() {
+ };
+ Map parsedJson = objectMapper.readValue(message, hashMapTypeReference);
+
+ if (doUsePointer) {
+ parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot);
+ }
+
+ if (doWriteToRoot) {
+ writeToRoot(event, parsedJson);
+ } else {
+ event.put(destination, parsedJson);
+ }
+ } catch (final JsonProcessingException jsonException) {
+ event.getMetadata().addTags(tagsOnFailure);
+ LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", event, jsonException);
+ } catch (final Exception e) {
+ event.getMetadata().addTags(tagsOnFailure);
+ LOG.error(EVENT, "An exception occurred while using the parse_json processor on Event [{}]", event, e);
}
- } catch (final JsonProcessingException jsonException) {
- event.getMetadata().addTags(tagsOnFailure);
- LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", event, jsonException);
- }
}
return records;
}