From 67f9d1d6bfb1e554ec1144e0ccdabd1f7b77694d Mon Sep 17 00:00:00 2001
From: eemhu <125959687+eemhu@users.noreply.github.com>
Date: Mon, 2 Dec 2024 13:33:28 +0200
Subject: [PATCH] Separate json records into multiple messages (#40)
* implement JsonRecords object which parses json records type event into separate messages.
* JsonRecords.java: make rv immutable; use try-with-resources with JsonReader
Co-authored-by: Moonbow-1 <146731818+MoonBow-1@users.noreply.github.com>
* comments for JsonRecords.java event passthroughs
* make expected messages explicit
* add EqualsVerifier and test for JsonRecords.
---------
Co-authored-by: Moonbow-1 <146731818+MoonBow-1@users.noreply.github.com>
---
pom.xml | 18 +++
.../com/teragrep/aer_02/SyslogBridge.java | 8 +-
.../com/teragrep/aer_02/json/JsonRecords.java | 113 ++++++++++++++++++
.../com/teragrep/aer_02/SyslogBridgeTest.java | 57 +++++++++
.../teragrep/aer_02/json/JsonRecordsTest.java | 108 +++++++++++++++++
5 files changed, 302 insertions(+), 2 deletions(-)
create mode 100644 src/main/java/com/teragrep/aer_02/json/JsonRecords.java
create mode 100644 src/test/java/com/teragrep/aer_02/json/JsonRecordsTest.java
diff --git a/pom.xml b/pom.xml
index 06f7733..61082e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,6 +137,13 @@
5.9.2
test
+
+
+ nl.jqno.equalsverifier
+ equalsverifier
+ 3.17.4
+ test
+
@@ -162,6 +169,17 @@
slf4j-api
2.0.7
+
+
+ jakarta.json
+ jakarta.json-api
+ 2.1.3
+
+
+ org.eclipse.parsson
+ parsson
+ 1.1.7
+
${project.basedir}/target
diff --git a/src/main/java/com/teragrep/aer_02/SyslogBridge.java b/src/main/java/com/teragrep/aer_02/SyslogBridge.java
index 0076ae8..4f5601e 100644
--- a/src/main/java/com/teragrep/aer_02/SyslogBridge.java
+++ b/src/main/java/com/teragrep/aer_02/SyslogBridge.java
@@ -49,6 +49,7 @@
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.teragrep.aer_02.config.source.EnvironmentSource;
+import com.teragrep.aer_02.json.JsonRecords;
import com.teragrep.aer_02.metrics.JmxReport;
import com.teragrep.aer_02.metrics.PrometheusReport;
import com.teragrep.aer_02.metrics.Report;
@@ -144,8 +145,11 @@ public void eventHubTriggerToSyslog(
if (events[index] != null) {
final ZonedDateTime et = ZonedDateTime.parse(enqueuedTimeUtcArray.get(index) + "Z"); // needed as the UTC time presented does not have a TZ
context.getLogger().fine("Accepting event: " + events[index]);
- consumer
- .accept(events[index], partitionContext, et, offsetArray.get(index), propertiesArray[index], systemPropertiesArray[index]);
+ final String[] records = new JsonRecords(events[index]).records();
+ for (final String record : records) {
+ consumer
+ .accept(record, partitionContext, et, offsetArray.get(index), propertiesArray[index], systemPropertiesArray[index]);
+ }
}
else {
context.getLogger().warning("eventHubTriggerToSyslog event data is null");
diff --git a/src/main/java/com/teragrep/aer_02/json/JsonRecords.java b/src/main/java/com/teragrep/aer_02/json/JsonRecords.java
new file mode 100644
index 0000000..a89733a
--- /dev/null
+++ b/src/main/java/com/teragrep/aer_02/json/JsonRecords.java
@@ -0,0 +1,113 @@
+/*
+ * Teragrep Eventhub Reader as an Azure Function
+ * Copyright (C) 2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.aer_02.json;
+
+import jakarta.json.*;
+import jakarta.json.stream.JsonParsingException;
+
+import java.io.StringReader;
+import java.util.Objects;
+
+public final class JsonRecords {
+
+ private final String event;
+
+ public JsonRecords(final String event) {
+ this.event = event;
+ }
+
+ /**
+ * Expects {"records":[{},{}, ..., {}]}
type JSON string.
+ *
+ * @return individual records as an array or the original event.
+ */
+ public String[] records() {
+ final String[] rv = new String[] {
+ event
+ };
+
+ final JsonStructure mainStructure;
+ try (final JsonReader reader = Json.createReader(new StringReader(event))) {
+ mainStructure = reader.read();
+ }
+ catch (JsonParsingException e) {
+ // pass event through as-is if JSON parsing fails
+ return rv;
+ }
+
+ final JsonValue recordsStructure = mainStructure.getValue("/records");
+
+ if (recordsStructure.getValueType().equals(JsonValue.ValueType.ARRAY)) {
+ final JsonArray recordsArray = recordsStructure.asJsonArray();
+ String[] records = new String[recordsArray.size()];
+
+ for (int i = 0; i < recordsArray.size(); i++) {
+ // Take string representation of inner value regardless of actual datatype
+ records[i] = recordsArray.get(i).toString();
+ }
+
+ return records;
+ }
+
+ // pass event through as-is if "records" is not an array type
+ return rv;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ JsonRecords that = (JsonRecords) o;
+ return Objects.equals(event, that.event);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(event);
+ }
+}
diff --git a/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java b/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java
index 25ecd9e..f16e324 100644
--- a/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java
+++ b/src/test/java/com/teragrep/aer_02/SyslogBridgeTest.java
@@ -60,6 +60,7 @@
import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate;
import com.teragrep.rlp_03.frame.delegate.FrameContext;
import com.teragrep.rlp_03.frame.delegate.FrameDelegate;
+import jakarta.json.Json;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -154,6 +155,62 @@ void testSyslogBridge() {
Assertions.assertEquals(3, loops);
}
+ @Test
+ void testSyslogBridgeWithJsonRecordsData() {
+ PartitionContextFake pcf = new PartitionContextFake("eventhub.123", "test1", "$Default", "0");
+ Map props = new HashMap<>();
+ final SyslogBridge bridge = new SyslogBridge();
+
+ final String jsonRecords = Json
+ .createObjectBuilder()
+ .add("records", Json.createArrayBuilder().add("record1").add("record2").add("record3").build())
+ .build()
+ .toString();
+
+ bridge.eventHubTriggerToSyslog(new String[] {
+ jsonRecords, jsonRecords, jsonRecords
+ }, pcf.asMap(), new Map[] {
+ props, props, props
+ }, new Map[] {
+ new SystemPropsFake("0").asMap(), new SystemPropsFake("1").asMap(), new SystemPropsFake("2").asMap()
+ }, Arrays.asList("2010-01-01T00:00:00", "2010-01-02T00:00:00", "2010-01-03T00:00:00"),
+ Arrays.asList("0", "1", "2"), new ExecutionContextFake()
+ );
+
+ // there are 3 JSON records-type events with 3 records each, totalling 9 messages
+ Assertions.assertEquals(9, messages.size());
+
+ final String[] expectedSeqNums = new String[] {
+ "0", "0", "0", "1", "1", "1", "2", "2", "2"
+ };
+
+ final String[] expectedMessages = new String[] {
+ "\"record1\"",
+ "\"record2\"",
+ "\"record3\"",
+ "\"record1\"",
+ "\"record2\"",
+ "\"record3\"",
+ "\"record1\"",
+ "\"record2\"",
+ "\"record3\""
+ };
+
+ int loops = 0;
+ for (String message : messages) {
+ final RFC5424Frame frame = new RFC5424Frame(false);
+ frame.load(new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8)));
+ Assertions.assertTrue(Assertions.assertDoesNotThrow(frame::next));
+ Assertions.assertEquals(expectedMessages[loops], frame.msg.toString());
+ Assertions.assertEquals("localhost.localdomain", frame.hostname.toString());
+ Assertions.assertEquals("aer-02", frame.appName.toString());
+ Assertions.assertEquals(expectedSeqNums[loops], frame.msgId.toString());
+ loops++;
+ }
+
+ Assertions.assertEquals(9, loops);
+ }
+
@Test
void testSyslogBridgeMetrics() {
PartitionContextFake pcf = new PartitionContextFake("eventhub.123", "test1", "$Default", "0");
diff --git a/src/test/java/com/teragrep/aer_02/json/JsonRecordsTest.java b/src/test/java/com/teragrep/aer_02/json/JsonRecordsTest.java
new file mode 100644
index 0000000..0d58735
--- /dev/null
+++ b/src/test/java/com/teragrep/aer_02/json/JsonRecordsTest.java
@@ -0,0 +1,108 @@
+/*
+ * Teragrep Eventhub Reader as an Azure Function
+ * Copyright (C) 2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.aer_02.json;
+
+import jakarta.json.Json;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class JsonRecordsTest {
+
+ @Test
+ void testRecordsAsObjectsCase() {
+ final String records = Json
+ .createObjectBuilder()
+ .add("records", Json.createArrayBuilder().add(Json.createObjectBuilder().add("a", "b")).add(Json.createObjectBuilder().add("c", "d"))).build().toString();
+ JsonRecords jr = new JsonRecords(records);
+ final String[] result = jr.records();
+ Assertions.assertEquals(2, result.length);
+ Assertions.assertEquals("{\"a\":\"b\"}", result[0]);
+ Assertions.assertEquals("{\"c\":\"d\"}", result[1]);
+ }
+
+ @Test
+ void testRecordsAsStringsAndNumbersCase() {
+ final String records = Json
+ .createObjectBuilder()
+ .add("records", Json.createArrayBuilder().add("abc").add(123).build())
+ .build()
+ .toString();
+ JsonRecords jr = new JsonRecords(records);
+ final String[] result = jr.records();
+ Assertions.assertEquals(2, result.length);
+ Assertions.assertEquals("\"abc\"", result[0]);
+ Assertions.assertEquals("123", result[1]);
+ }
+
+ @Test
+ void testNonJsonRecordsCase() {
+ final String records = "{{]///...<>;";
+ JsonRecords jr = new JsonRecords(records);
+ final String[] result = jr.records();
+ Assertions.assertEquals(1, result.length);
+ Assertions.assertEquals("{{]///...<>;", result[0]);
+ }
+
+ @Test
+ void testEquals() {
+ JsonRecords jr = new JsonRecords("{\"a\":\"b\",\"c\":\"d\"}");
+ JsonRecords jr2 = new JsonRecords("{\"a\":\"b\",\"c\":\"d\"}");
+ Assertions.assertEquals(jr, jr2);
+ }
+
+ @Test
+ void testNotEquals() {
+ JsonRecords jr = new JsonRecords("{\"a\":\"b\",\"c\":\"d\"}");
+ JsonRecords jr2 = new JsonRecords("{\"e\":\"f\",\"g\":\"h\"}");
+ Assertions.assertNotEquals(jr, jr2);
+ }
+
+ @Test
+ void testEqualsContract() {
+ EqualsVerifier.forClass(JsonRecords.class).verify();
+ }
+}