Skip to content

Commit 82da49a

Browse files
committed
support for symbols
1 parent d16cd05 commit 82da49a

File tree

3 files changed

+15
-1
lines changed

3 files changed

+15
-1
lines changed

connector/src/main/java/io/questdb/kafka/BufferingSender.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ final class BufferingSender implements Sender {
3737
private final List<CharSequence> symbolColumnNames = new ArrayList<>(DEFAULT_CAPACITY);
3838
private final List<CharSequence> symbolColumnValues = new ArrayList<>(DEFAULT_CAPACITY);
3939
private final Set<CharSequence> symbolColumns = new HashSet<>();
40+
private final List<CharSequence> doubleArrayNames = new ArrayList<>(DEFAULT_CAPACITY);
41+
private final List<double[]> doubleArrayValues = new ArrayList<>(DEFAULT_CAPACITY);
4042

4143
BufferingSender(Sender sender, String symbolColumns) {
4244
this.sender = sender;
@@ -201,6 +203,14 @@ private void transferFields() {
201203
}
202204
timestampNames.clear();
203205
timestampValues.clear();
206+
207+
for (int i = 0, n = doubleArrayNames.size(); i < n; i++) {
208+
CharSequence fieldName = doubleArrayNames.get(i);
209+
double[] fieldValue = doubleArrayValues.get(i);
210+
sender.doubleArray(fieldName, fieldValue);
211+
}
212+
doubleArrayNames.clear();
213+
doubleArrayValues.clear();
204214
}
205215

206216
private static long unitToMicros(long value, ChronoUnit unit) {
@@ -241,7 +251,9 @@ public void close() {
241251

242252
@Override
243253
public Sender doubleArray(CharSequence charSequence, double[] doubles) {
244-
throw new UnsupportedOperationException("not implemented");
254+
doubleArrayNames.add(charSequence);
255+
doubleArrayValues.add(doubles);
256+
return this;
245257
}
246258

247259
@Override

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,7 @@ private void handleArray(String name, Object value, Schema schema) {
577577
}
578578
sender.doubleArray(name, doubleArray);
579579
} else if (elementType == Schema.Type.ARRAY) {
580+
// todo: handle multidimensional arrays
580581
onUnsupportedType(name, "Multidimensional ARRAY");
581582
} else {
582583
onUnsupportedType(name, "ARRAY<" + elementType + ">");

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2067,6 +2067,7 @@ public void testFloat64ArraySupport(boolean useHttp) {
20672067
connect.kafka().createTopic(topicName, 1);
20682068
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
20692069
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
2070+
props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "devices");
20702071
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
20712072
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
20722073

0 commit comments

Comments
 (0)