Skip to content

Commit 491c40c

Browse files
committed
support for 2D and 3D array in BufferedSender
1 parent 9c0a99a commit 491c40c

File tree

3 files changed

+202
-14
lines changed

3 files changed

+202
-14
lines changed

connector/src/main/java/io/questdb/kafka/BufferingSender.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ final class BufferingSender implements Sender {
3939
private final Set<CharSequence> symbolColumns = new HashSet<>();
4040
private final List<CharSequence> doubleArrayNames = new ArrayList<>(DEFAULT_CAPACITY);
4141
private final List<double[]> doubleArrayValues = new ArrayList<>(DEFAULT_CAPACITY);
42+
private final List<CharSequence> doubleArray2DNames = new ArrayList<>(DEFAULT_CAPACITY);
43+
private final List<double[][]> doubleArray2DValues = new ArrayList<>(DEFAULT_CAPACITY);
44+
private final List<CharSequence> doubleArray3DNames = new ArrayList<>(DEFAULT_CAPACITY);
45+
private final List<double[][][]> doubleArray3DValues = new ArrayList<>(DEFAULT_CAPACITY);
4246

4347
BufferingSender(Sender sender, String symbolColumns) {
4448
this.sender = sender;
@@ -121,6 +125,12 @@ public void cancelRow() {
121125
boolValues.clear();
122126
timestampNames.clear();
123127
timestampValues.clear();
128+
doubleArrayNames.clear();
129+
doubleArrayValues.clear();
130+
doubleArray2DNames.clear();
131+
doubleArray2DValues.clear();
132+
doubleArray3DNames.clear();
133+
doubleArray3DValues.clear();
124134

125135
sender.cancelRow();
126136
}
@@ -211,6 +221,22 @@ private void transferFields() {
211221
}
212222
doubleArrayNames.clear();
213223
doubleArrayValues.clear();
224+
225+
for (int i = 0, n = doubleArray2DNames.size(); i < n; i++) {
226+
CharSequence fieldName = doubleArray2DNames.get(i);
227+
double[][] fieldValue = doubleArray2DValues.get(i);
228+
sender.doubleArray(fieldName, fieldValue);
229+
}
230+
doubleArray2DNames.clear();
231+
doubleArray2DValues.clear();
232+
233+
for (int i = 0, n = doubleArray3DNames.size(); i < n; i++) {
234+
CharSequence fieldName = doubleArray3DNames.get(i);
235+
double[][][] fieldValue = doubleArray3DValues.get(i);
236+
sender.doubleArray(fieldName, fieldValue);
237+
}
238+
doubleArray3DNames.clear();
239+
doubleArray3DValues.clear();
214240
}
215241

216242
private static long unitToMicros(long value, ChronoUnit unit) {
@@ -258,12 +284,16 @@ public Sender doubleArray(CharSequence charSequence, double[] doubles) {
258284

259285
@Override
260286
public Sender doubleArray(CharSequence charSequence, double[][] doubles) {
261-
throw new UnsupportedOperationException("not implemented");
287+
doubleArray2DNames.add(charSequence);
288+
doubleArray2DValues.add(doubles);
289+
return this;
262290
}
263291

264292
@Override
265293
public Sender doubleArray(CharSequence charSequence, double[][][] doubles) {
266-
throw new UnsupportedOperationException("not implemented");
294+
doubleArray3DNames.add(charSequence);
295+
doubleArray3DValues.add(doubles);
296+
return this;
267297
}
268298

269299
@Override

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 79 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -580,12 +580,25 @@ private void handleArray(String name, Object value, Schema schema) {
580580
Schema nestedValueSchema = valueSchema.valueSchema();
581581
if (nestedValueSchema != null && (nestedValueSchema.type() == Schema.Type.FLOAT32 || nestedValueSchema.type() == Schema.Type.FLOAT64)) {
582582
List<?> list = (List<?>) value;
583+
584+
// First, validate that all rows have the same length (no jagged arrays)
585+
if (!list.isEmpty()) {
586+
int expectedRowLength = ((List<?>) list.get(0)).size();
587+
for (int i = 0; i < list.size(); i++) {
588+
Object row = list.get(i);
589+
if (row == null) {
590+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
591+
}
592+
List<?> rowList = (List<?>) row;
593+
if (rowList.size() != expectedRowLength) {
594+
throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but row " + i + " has length: " + rowList.size());
595+
}
596+
}
597+
}
598+
583599
double[][] doubleArray2D = new double[list.size()][];
584600
for (int i = 0; i < list.size(); i++) {
585601
Object row = list.get(i);
586-
if (row == null) {
587-
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
588-
}
589602
List<?> rowList = (List<?>) row;
590603
doubleArray2D[i] = new double[rowList.size()];
591604
for (int j = 0; j < rowList.size(); j++) {
@@ -601,19 +614,43 @@ private void handleArray(String name, Object value, Schema schema) {
601614
Schema nestedNestedValueSchema = nestedValueSchema.valueSchema();
602615
if (nestedNestedValueSchema != null && (nestedNestedValueSchema.type() == Schema.Type.FLOAT32 || nestedNestedValueSchema.type() == Schema.Type.FLOAT64)) {
603616
List<?> list = (List<?>) value;
617+
618+
// First, validate dimensions for 3D array (no jagged arrays)
619+
if (!list.isEmpty()) {
620+
List<?> firstMatrix = (List<?>) list.get(0);
621+
int expectedMatrixHeight = firstMatrix.size();
622+
int expectedRowLength = firstMatrix.isEmpty() ? 0 : ((List<?>) firstMatrix.get(0)).size();
623+
624+
for (int i = 0; i < list.size(); i++) {
625+
Object matrix = list.get(i);
626+
if (matrix == null) {
627+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
628+
}
629+
List<?> matrixList = (List<?>) matrix;
630+
if (matrixList.size() != expectedMatrixHeight) {
631+
throw new InvalidDataException("QuestDB does not support jagged arrays. All matrices must have the same height. Expected: " + expectedMatrixHeight + ", but matrix " + i + " has height: " + matrixList.size());
632+
}
633+
634+
for (int j = 0; j < matrixList.size(); j++) {
635+
Object row = matrixList.get(j);
636+
if (row == null) {
637+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
638+
}
639+
List<?> rowList = (List<?>) row;
640+
if (rowList.size() != expectedRowLength) {
641+
throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but matrix " + i + " row " + j + " has length: " + rowList.size());
642+
}
643+
}
644+
}
645+
}
646+
604647
double[][][] doubleArray3D = new double[list.size()][][];
605648
for (int i = 0; i < list.size(); i++) {
606649
Object matrix = list.get(i);
607-
if (matrix == null) {
608-
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
609-
}
610650
List<?> matrixList = (List<?>) matrix;
611651
doubleArray3D[i] = new double[matrixList.size()][];
612652
for (int j = 0; j < matrixList.size(); j++) {
613653
Object row = matrixList.get(j);
614-
if (row == null) {
615-
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
616-
}
617654
List<?> rowList = (List<?>) row;
618655
doubleArray3D[i][j] = new double[rowList.size()];
619656
for (int k = 0; k < rowList.size(); k++) {
@@ -672,7 +709,8 @@ private void handleArrayWithoutSchema(String name, List<?> list) {
672709
}
673710

674711
if (firstNestedElement instanceof Number) {
675-
double[][] doubleArray2D = new double[list.size()][];
712+
// First, validate that all rows have the same length (no jagged arrays)
713+
int expectedRowLength = firstList.size();
676714
for (int i = 0; i < list.size(); i++) {
677715
Object row = list.get(i);
678716
if (row == null) {
@@ -682,6 +720,15 @@ private void handleArrayWithoutSchema(String name, List<?> list) {
682720
throw new InvalidDataException("QuestDB 2D array rows must be Lists");
683721
}
684722
List<?> rowList = (List<?>) row;
723+
if (rowList.size() != expectedRowLength) {
724+
throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but row " + i + " has length: " + rowList.size());
725+
}
726+
}
727+
728+
double[][] doubleArray2D = new double[list.size()][];
729+
for (int i = 0; i < list.size(); i++) {
730+
Object row = list.get(i);
731+
List<?> rowList = (List<?>) row;
685732
doubleArray2D[i] = new double[rowList.size()];
686733
for (int j = 0; j < rowList.size(); j++) {
687734
Object element = rowList.get(j);
@@ -706,7 +753,10 @@ private void handleArrayWithoutSchema(String name, List<?> list) {
706753
}
707754

708755
if (firstNestedNestedElement instanceof Number) {
709-
double[][][] doubleArray3D = new double[list.size()][][];
756+
// First, validate dimensions for 3D array (no jagged arrays)
757+
int expectedMatrixHeight = firstNestedList.size();
758+
int expectedRowLength = firstNestedList.size() > 0 ? ((List<?>) firstNestedList.get(0)).size() : 0;
759+
710760
for (int i = 0; i < list.size(); i++) {
711761
Object matrix = list.get(i);
712762
if (matrix == null) {
@@ -716,7 +766,10 @@ private void handleArrayWithoutSchema(String name, List<?> list) {
716766
throw new InvalidDataException("QuestDB 3D array matrices must be Lists");
717767
}
718768
List<?> matrixList = (List<?>) matrix;
719-
doubleArray3D[i] = new double[matrixList.size()][];
769+
if (matrixList.size() != expectedMatrixHeight) {
770+
throw new InvalidDataException("QuestDB does not support jagged arrays. All matrices must have the same height. Expected: " + expectedMatrixHeight + ", but matrix " + i + " has height: " + matrixList.size());
771+
}
772+
720773
for (int j = 0; j < matrixList.size(); j++) {
721774
Object row = matrixList.get(j);
722775
if (row == null) {
@@ -726,6 +779,20 @@ private void handleArrayWithoutSchema(String name, List<?> list) {
726779
throw new InvalidDataException("QuestDB 3D array rows must be Lists");
727780
}
728781
List<?> rowList = (List<?>) row;
782+
if (rowList.size() != expectedRowLength) {
783+
throw new InvalidDataException("QuestDB does not support jagged arrays. All rows must have the same length. Expected: " + expectedRowLength + ", but matrix " + i + " row " + j + " has length: " + rowList.size());
784+
}
785+
}
786+
}
787+
788+
double[][][] doubleArray3D = new double[list.size()][][];
789+
for (int i = 0; i < list.size(); i++) {
790+
Object matrix = list.get(i);
791+
List<?> matrixList = (List<?>) matrix;
792+
doubleArray3D[i] = new double[matrixList.size()][];
793+
for (int j = 0; j < matrixList.size(); j++) {
794+
Object row = matrixList.get(j);
795+
List<?> rowList = (List<?>) row;
729796
doubleArray3D[i][j] = new double[rowList.size()];
730797
for (int k = 0; k < rowList.size(); k++) {
731798
Object element = rowList.get(k);

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2399,4 +2399,95 @@ public void test2DFloatArraySupport(boolean useHttp) {
23992399
httpPort
24002400
);
24012401
}
2402+
2403+
@Test
2404+
public void testJaggedArrayRejection() {
2405+
connect.kafka().createTopic(topicName, 1);
2406+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2407+
props.put("value.converter.schemas.enable", "false");
2408+
props.put("errors.tolerance", "none");
2409+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2410+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2411+
2412+
// send JSON with jagged 2D array (different row lengths)
2413+
String json = "{\"experiment\":\"jagged\",\"results\":[[1.5,2.5,3.5],[4.5,5.5]]}";
2414+
connect.kafka().produce(topicName, json);
2415+
2416+
ConnectTestUtils.assertConnectorTaskFailedEventually(connect);
2417+
}
2418+
2419+
@ParameterizedTest
2420+
@ValueSource(booleans = {true, false})
2421+
public void test2DArrayWithSymbolColumns(boolean useHttp) {
2422+
connect.kafka().createTopic(topicName, 1);
2423+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2424+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2425+
props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "sensor_id");
2426+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2427+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2428+
2429+
// Create schema with 2D double array and symbol column
2430+
Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build();
2431+
Schema arraySchema = SchemaBuilder.array(innerArraySchema).build();
2432+
Schema schema = SchemaBuilder.struct()
2433+
.name("com.example.MatrixWithSymbol")
2434+
.field("sensor_id", Schema.STRING_SCHEMA)
2435+
.field("readings", arraySchema)
2436+
.build();
2437+
2438+
// Create 2D array data: [[1.1, 2.2], [3.3, 4.4]]
2439+
Struct struct = new Struct(schema)
2440+
.put("sensor_id", "sensor-001")
2441+
.put("readings", Arrays.asList(
2442+
Arrays.asList(1.1, 2.2),
2443+
Arrays.asList(3.3, 4.4)
2444+
));
2445+
2446+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2447+
2448+
QuestDBUtils.assertSqlEventually(
2449+
"\"sensor_id\",\"readings\"\r\n" +
2450+
"\"sensor-001\",\"[[1.1,2.2],[3.3,4.4]]\"\r\n",
2451+
"select sensor_id, readings from " + topicName,
2452+
httpPort
2453+
);
2454+
}
2455+
2456+
@ParameterizedTest
2457+
@ValueSource(booleans = {true, false})
2458+
public void test3DArrayWithSymbolColumns(boolean useHttp) {
2459+
connect.kafka().createTopic(topicName, 1);
2460+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2461+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2462+
props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "device_id");
2463+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2464+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2465+
2466+
// Create schema with 3D double array and symbol column
2467+
Schema innerArraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build();
2468+
Schema middleArraySchema = SchemaBuilder.array(innerArraySchema).build();
2469+
Schema arraySchema = SchemaBuilder.array(middleArraySchema).build();
2470+
Schema schema = SchemaBuilder.struct()
2471+
.name("com.example.TensorWithSymbol")
2472+
.field("device_id", Schema.STRING_SCHEMA)
2473+
.field("data", arraySchema)
2474+
.build();
2475+
2476+
// Create 3D array data: [[[1.0, 2.0]], [[3.0, 4.0]]]
2477+
Struct struct = new Struct(schema)
2478+
.put("device_id", "device-alpha")
2479+
.put("data", Arrays.asList(
2480+
Arrays.asList(Arrays.asList(1.0, 2.0)),
2481+
Arrays.asList(Arrays.asList(3.0, 4.0))
2482+
));
2483+
2484+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2485+
2486+
QuestDBUtils.assertSqlEventually(
2487+
"\"device_id\",\"data\"\r\n" +
2488+
"\"device-alpha\",\"[[[1.0,2.0]],[[3.0,4.0]]]\"\r\n",
2489+
"select device_id, data from " + topicName,
2490+
httpPort
2491+
);
2492+
}
24022493
}

0 commit comments

Comments
 (0)