Skip to content

Commit d16cd05

Browse files
committed
1D arrays supported
1 parent a60ec9f commit d16cd05

File tree

3 files changed

+311
-11
lines changed

3 files changed

+311
-11
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private Sender createRawSender() {
114114
}
115115
return Sender.fromConfig(sink);
116116
}
117-
log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.io/docs/third-party-tools/kafka/questdb-kafka/#configuration-options");
117+
log.warn("Configuration options 'host', 'tsl', 'token' and 'username' are deprecated and will be removed in the future. Use 'client.conf.string' instead. See: https://questdb.com/docs/third-party-tools/kafka/#configuration-manual");
118118
Sender.LineSenderBuilder builder = Sender.builder(Sender.Transport.TCP).address(config.getHost());
119119
if (config.isTls()) {
120120
builder.enableTls();
@@ -483,6 +483,8 @@ private void writePhysicalTypeWithoutSchema(String name, Object value, String fa
483483
} else if (value instanceof java.util.Date) {
484484
long epochMillis = ((java.util.Date) value).getTime();
485485
sender.timestampColumn(actualName, TimeUnit.MILLISECONDS.toMicros(epochMillis), ChronoUnit.MICROS);
486+
} else if (value instanceof List) {
487+
handleArrayWithoutSchema(actualName, (List<?>) value);
486488
} else {
487489
onUnsupportedType(actualName, value.getClass().getName());
488490
}
@@ -539,15 +541,79 @@ private boolean tryWritePhysicalTypeFromSchema(String name, Schema schema, Objec
539541
case STRUCT:
540542
handleStruct(name, (Struct) value, schema);
541543
break;
542-
case BYTES:
543544
case ARRAY:
545+
handleArray(sanitizedName, value, schema);
546+
break;
547+
case BYTES:
544548
case MAP:
545549
default:
546550
onUnsupportedType(name, type);
547551
}
548552
return true;
549553
}
550554

555+
private void handleArray(String name, Object value, Schema schema) {
556+
if (value == null) {
557+
return;
558+
}
559+
560+
Schema valueSchema = schema.valueSchema();
561+
if (valueSchema == null) {
562+
throw new InvalidDataException("Array schema must have a value schema");
563+
}
564+
565+
Schema.Type elementType = valueSchema.type();
566+
567+
if (elementType == Schema.Type.FLOAT32 || elementType == Schema.Type.FLOAT64) {
568+
List<?> list = (List<?>) value;
569+
// todo: do not allocate new arrays
570+
double[] doubleArray = new double[list.size()];
571+
for (int i = 0; i < list.size(); i++) {
572+
Object element = list.get(i);
573+
if (element == null) {
574+
throw new InvalidDataException("Array elements cannot be null for QuestDB double arrays");
575+
}
576+
doubleArray[i] = ((Number) element).doubleValue();
577+
}
578+
sender.doubleArray(name, doubleArray);
579+
} else if (elementType == Schema.Type.ARRAY) {
580+
onUnsupportedType(name, "Multidimensional ARRAY");
581+
} else {
582+
onUnsupportedType(name, "ARRAY<" + elementType + ">");
583+
}
584+
}
585+
586+
private void handleArrayWithoutSchema(String name, List<?> list) {
587+
if (list == null || list.isEmpty()) {
588+
return;
589+
}
590+
591+
Object firstElement = list.get(0);
592+
if (firstElement == null) {
593+
throw new InvalidDataException("QuestDB array elements cannot be null");
594+
}
595+
596+
if (firstElement instanceof Number) {
597+
// todo: do not allocate new arrays
598+
double[] doubleArray = new double[list.size()];
599+
for (int i = 0; i < list.size(); i++) {
600+
Object element = list.get(i);
601+
if (element == null) {
602+
onUnsupportedType(name, "null element in ARRAY");
603+
} else if (!(element instanceof Number)) {
604+
onUnsupportedType(name, "ARRAY<" + element.getClass().getSimpleName() + ">");
605+
} else {
606+
doubleArray[i] = ((Number) element).doubleValue();
607+
}
608+
}
609+
sender.doubleArray(name, doubleArray);
610+
} else if (firstElement instanceof List) {
611+
onUnsupportedType(name, "Multidimensional ARRAY");
612+
} else {
613+
onUnsupportedType(name, "ARRAY<" + firstElement.getClass().getSimpleName() + ">");
614+
}
615+
}
616+
551617
private void onUnsupportedType(String name, Object type) {
552618
if (config.isSkipUnsupportedTypes()) {
553619
log.debug("Skipping unsupported type: {}, name: {}", type, name);
@@ -577,8 +643,8 @@ private boolean tryWriteLogicalType(String name, Schema schema, Object value) {
577643
sender.timestampColumn(name, epochMillis, ChronoUnit.MILLIS);
578644
return true;
579645
case Time.LOGICAL_NAME:
580-
d = (java.util.Date) value;
581-
long dayMillis = d.getTime();
646+
java.util.Date timeValue = (java.util.Date) value;
647+
long dayMillis = timeValue.getTime();
582648
sender.longColumn(name, dayMillis);
583649
return true;
584650
case Decimal.LOGICAL_NAME:

connector/src/test/java/io/questdb/kafka/ConnectTestUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ static Map<String, String> baseConnectorProps(GenericContainer<?> questDBContain
5757
confString = "http::addr=" + host + ":" + port + ";";
5858
props.put("client.conf.string", confString);
5959
} else {
60-
String ilpIUrl = host + ":" + questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT);
61-
props.put("host", ilpIUrl);
60+
int port = questDBContainer.getMappedPort(QuestDBUtils.QUESTDB_ILP_PORT);
61+
confString = "tcp::addr=" + host + ":" + port + ";protocol_version=2;";
62+
props.put("client.conf.string", confString);
6263
}
6364
return props;
6465
}

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 238 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
public final class QuestDBSinkConnectorEmbeddedTest {
5353
private static int httpPort = -1;
5454
private static int ilpPort = -1;
55-
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:8.2.0";
55+
private static final String OFFICIAL_QUESTDB_DOCKER = "questdb/questdb:9.0.1";
5656
private static final boolean DUMP_QUESTDB_CONTAINER_LOGS = true;
5757

5858
private EmbeddedConnectCluster connect;
@@ -957,7 +957,7 @@ public void testExactlyOnce_withDedup() throws BrokenBarrierException, Interrupt
957957

958958
// make sure we have all records in the table
959959
QuestDBUtils.assertSqlEventually(
960-
"\"count\"\r\n"
960+
"\"count()\"\r\n"
961961
+ recordCount + "\r\n",
962962
"select count(*) from " + topicName,
963963
600,
@@ -1545,15 +1545,28 @@ public void testJsonNoSchema_mixedFlotingAndIntTypes(boolean useHttp) {
15451545

15461546
@ParameterizedTest
15471547
@ValueSource(booleans = {true, false})
1548-
public void testJsonNoSchema_ArrayNotSupported(boolean useHttp) {
1548+
public void testJsonNoSchema_intArraySendAsDoubleArray(boolean useHttp) {
1549+
// In schema-less mode, we have to be lenient with array element types.
1550+
// Since floating point numbers without any actual decimal point are
1551+
// instantiated as integers by Kafka Connect.
1552+
//
1553+
// This will become problematic once QuestDB supports arrays of integers,
1554+
// as it will not be able to distinguish between an array of integers and
1555+
// an array of doubles.
1556+
// For now, we just assume that all arrays are of doubles.
1557+
1558+
15491559
connect.kafka().createTopic(topicName, 1);
15501560
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
15511561
props.put("value.converter.schemas.enable", "false");
15521562
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
15531563
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
1554-
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"array\":[1,2,3]}");
1564+
connect.kafka().produce(topicName, "key", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"arr\":[1,2,3]}");
15551565

1556-
ConnectTestUtils.assertConnectorTaskFailedEventually(connect);
1566+
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"arr\"\r\n"
1567+
+ "\"John\",\"Doe\",42,\"[1.0,2.0,3.0]\"\r\n",
1568+
"select firstname,lastname,age,arr from " + topicName,
1569+
httpPort);
15571570
}
15581571

15591572
@ParameterizedTest
@@ -2016,4 +2029,224 @@ public void testMultiLevelNestedStructInValue(boolean useHttp) {
20162029
"select partner1_name_firstname, partner1_name_lastname, partner2_name_firstname, partner2_name_lastname from " + topicName,
20172030
httpPort);
20182031
}
2032+
2033+
@ParameterizedTest
2034+
@ValueSource(booleans = {true, false})
2035+
public void testFloat32ArraySupport(boolean useHttp) {
2036+
connect.kafka().createTopic(topicName, 1);
2037+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2038+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2039+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2040+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2041+
2042+
// Create schema with float array
2043+
Schema arraySchema = SchemaBuilder.array(Schema.FLOAT32_SCHEMA).build();
2044+
Schema schema = SchemaBuilder.struct()
2045+
.name("com.example.Measurement")
2046+
.field("sensor_id", Schema.STRING_SCHEMA)
2047+
.field("readings", arraySchema)
2048+
.build();
2049+
2050+
Struct struct = new Struct(schema)
2051+
.put("sensor_id", "sensor1")
2052+
.put("readings", Arrays.asList(23.5f, 24.1f, 23.8f));
2053+
2054+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2055+
2056+
QuestDBUtils.assertSqlEventually(
2057+
"\"sensor_id\",\"readings\"\r\n" +
2058+
"\"sensor1\",\"[23.5,24.100000381469727,23.799999237060547]\"\r\n",
2059+
"select sensor_id, readings from " + topicName,
2060+
httpPort
2061+
);
2062+
}
2063+
2064+
@ParameterizedTest
2065+
@ValueSource(booleans = {true, false})
2066+
public void testFloat64ArraySupport(boolean useHttp) {
2067+
connect.kafka().createTopic(topicName, 1);
2068+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2069+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2070+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2071+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2072+
2073+
// Create schema with double array
2074+
Schema arraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build();
2075+
Schema schema = SchemaBuilder.struct()
2076+
.name("com.example.Measurement")
2077+
.field("device", Schema.STRING_SCHEMA)
2078+
.field("temperatures", arraySchema)
2079+
.build();
2080+
2081+
Struct struct = new Struct(schema)
2082+
.put("device", "thermometer1")
2083+
.put("temperatures", Arrays.asList(98.6, 99.1, 97.9));
2084+
2085+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2086+
2087+
QuestDBUtils.assertSqlEventually(
2088+
"\"device\",\"temperatures\"\r\n" +
2089+
"\"thermometer1\",\"[98.6,99.1,97.9]\"\r\n",
2090+
"select device, temperatures from " + topicName,
2091+
httpPort
2092+
);
2093+
}
2094+
2095+
@Test
2096+
public void testSchemalessFloatArraySupport() {
2097+
connect.kafka().createTopic(topicName, 1);
2098+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2099+
props.put("value.converter.schemas.enable", "false");
2100+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2101+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2102+
2103+
// Send JSON with array of doubles
2104+
String json = "{\"location\":\"room1\",\"humidity_readings\":[45.2,46.8,44.9]}";
2105+
connect.kafka().produce(topicName, json);
2106+
2107+
QuestDBUtils.assertSqlEventually(
2108+
"\"location\",\"humidity_readings\"\r\n" +
2109+
"\"room1\",\"[45.2,46.8,44.9]\"\r\n",
2110+
"select location, humidity_readings from " + topicName,
2111+
httpPort
2112+
);
2113+
}
2114+
2115+
@Test
2116+
public void testSchemalessFloatArraySupport_floatFollowedByInt() {
2117+
connect.kafka().createTopic(topicName, 1);
2118+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2119+
props.put("value.converter.schemas.enable", "false");
2120+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2121+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2122+
2123+
// Send JSON with array of doubles
2124+
String json = "{\"location\":\"room1\",\"humidity_readings\":[45.0,46,44.9]}";
2125+
connect.kafka().produce(topicName, json);
2126+
2127+
QuestDBUtils.assertSqlEventually(
2128+
"\"location\",\"humidity_readings\"\r\n" +
2129+
"\"room1\",\"[45.0,46.0,44.9]\"\r\n",
2130+
"select location, humidity_readings from " + topicName,
2131+
httpPort
2132+
);
2133+
}
2134+
2135+
@Test
2136+
public void testSchemalessFloatArraySupport_intFollowedByFloat() {
2137+
connect.kafka().createTopic(topicName, 1);
2138+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2139+
props.put("value.converter.schemas.enable", "false");
2140+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2141+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2142+
2143+
// Send JSON with array of doubles
2144+
String json = "{\"location\":\"room1\",\"humidity_readings\":[45,46.0,44.9]}";
2145+
connect.kafka().produce(topicName, json);
2146+
2147+
QuestDBUtils.assertSqlEventually(
2148+
"\"location\",\"humidity_readings\"\r\n" +
2149+
"\"room1\",\"[45.0,46.0,44.9]\"\r\n",
2150+
"select location, humidity_readings from " + topicName,
2151+
httpPort
2152+
);
2153+
}
2154+
2155+
2156+
@Test
2157+
public void testIntegerArrayRejection() {
2158+
connect.kafka().createTopic(topicName, 1);
2159+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2160+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2161+
props.put("errors.tolerance", "none");
2162+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2163+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2164+
2165+
// Create schema with integer array (should fail)
2166+
Schema arraySchema = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
2167+
Schema schema = SchemaBuilder.struct()
2168+
.name("com.example.Counter")
2169+
.field("name", Schema.STRING_SCHEMA)
2170+
.field("counts", arraySchema)
2171+
.build();
2172+
2173+
Struct struct = new Struct(schema)
2174+
.put("name", "counter1")
2175+
.put("counts", Arrays.asList(1, 2, 3));
2176+
2177+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2178+
2179+
// The connector should fail to process this record
2180+
ConnectTestUtils.assertConnectorTaskFailedEventually(connect);
2181+
}
2182+
2183+
@ParameterizedTest
2184+
@ValueSource(booleans = {true, false})
2185+
public void testNestedStructWithArray(boolean useHttp) {
2186+
connect.kafka().createTopic(topicName, 1);
2187+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
2188+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2189+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2190+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2191+
2192+
Schema arraySchema = SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build();
2193+
Schema sensorSchema = SchemaBuilder.struct()
2194+
.field("type", Schema.STRING_SCHEMA)
2195+
.field("values", arraySchema)
2196+
.build();
2197+
Schema schema = SchemaBuilder.struct()
2198+
.name("com.example.Device")
2199+
.field("id", Schema.STRING_SCHEMA)
2200+
.field("sensor", sensorSchema)
2201+
.build();
2202+
2203+
Struct sensorStruct = new Struct(sensorSchema)
2204+
.put("type", "temperature")
2205+
.put("values", Arrays.asList(20.5, 21.0, 20.8));
2206+
2207+
Struct struct = new Struct(schema)
2208+
.put("id", "device1")
2209+
.put("sensor", sensorStruct);
2210+
2211+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2212+
2213+
QuestDBUtils.assertSqlEventually(
2214+
"\"id\",\"sensor_type\",\"sensor_values\"\r\n" +
2215+
"\"device1\",\"temperature\",\"[20.5,21.0,20.8]\"\r\n",
2216+
"select id, sensor_type, sensor_values from " + topicName,
2217+
httpPort
2218+
);
2219+
}
2220+
2221+
@Test
2222+
public void testArrayWithSkipUnsupportedTypes() {
2223+
connect.kafka().createTopic(topicName, 1);
2224+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2225+
props.put("skip.unsupported.types", "true");
2226+
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2227+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2228+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2229+
2230+
// Create schema with string array (unsupported)
2231+
Schema arraySchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
2232+
Schema schema = SchemaBuilder.struct()
2233+
.name("com.example.Data")
2234+
.field("names", arraySchema)
2235+
.field("value", Schema.FLOAT64_SCHEMA)
2236+
.build();
2237+
2238+
Struct struct = new Struct(schema)
2239+
.put("names", Arrays.asList("a", "b", "c"))
2240+
.put("value", 42.0);
2241+
2242+
connect.kafka().produce(topicName, new String(converter.fromConnectData(topicName, schema, struct)));
2243+
2244+
// Verify - string array should be skipped but double field should be written
2245+
QuestDBUtils.assertSqlEventually(
2246+
"\"value\"\r\n" +
2247+
"42.0\r\n",
2248+
"select value from " + topicName,
2249+
httpPort
2250+
);
2251+
}
20192252
}

0 commit comments

Comments
 (0)