Skip to content

Commit dc72ef4

Browse files
Introduce performance measurement example (#162)
* Split up example message generator into a normal one (interval), a binary one (intended to run example 12) and a batch one (intended for performance measurements) * Introduce example 19 to perform generic performance measurements on any KSML pipeline * Remove reading back from binary topic in example 1 to allow running all examples in parallel (ie no conflict with example 12) * Remove dead code from TopologyGenerator * Improve error message in UserTypeParser
1 parent 567d1f7 commit dc72ef4

File tree

61 files changed

+5783
-628
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+5783
-628
lines changed

docs/ksml-language-spec.json

Lines changed: 5120 additions & 1 deletion
Large diffs are not rendered by default.
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
2+
3+
# This example shows how to generate data and have it sent to a target topic in a given format.
4+
5+
functions:
6+
generate_sensordata_message:
7+
type: generator
8+
globalCode: |
9+
import time
10+
import random
11+
sensorCounter = 0
12+
code: |
13+
global sensorCounter
14+
15+
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
16+
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
17+
18+
# Generate some random sensor measurement data
19+
types = { 0: { "type": "AREA", "unit": random.choice([ "m2", "ft2" ]), "value": str(random.randrange(1000)) },
20+
1: { "type": "HUMIDITY", "unit": random.choice([ "g/m3", "%" ]), "value": str(random.randrange(100)) },
21+
2: { "type": "LENGTH", "unit": random.choice([ "m", "ft" ]), "value": str(random.randrange(1000)) },
22+
3: { "type": "STATE", "unit": "state", "value": random.choice([ "off", "on" ]) },
23+
4: { "type": "TEMPERATURE", "unit": random.choice([ "C", "F" ]), "value": str(random.randrange(-100, 100)) }
24+
}
25+
26+
# Build the result value using any of the above measurement types
27+
value = { "name": key, "timestamp": str(round(time.time()*1000)), **random.choice(types) }
28+
value["color"] = random.choice([ "black", "blue", "red", "yellow", "white" ])
29+
value["owner"] = random.choice([ "Alice", "Bob", "Charlie", "Dave", "Evan" ])
30+
value["city"] = random.choice([ "Amsterdam", "Xanten", "Utrecht", "Alkmaar", "Leiden" ])
31+
32+
if random.randrange(10) == 0:
33+
key = None
34+
if random.randrange(10) == 0:
35+
value = None
36+
expression: (key, value) # Return a message tuple with the key and value
37+
resultType: (string, json) # Indicate the type of key and value
38+
39+
producers:
40+
# Produce 10k messages in batches of 100 messages with a 1ms interval
41+
sensordata_avro_producer:
42+
generator: generate_sensordata_message
43+
interval: 1
44+
count: 10000
45+
batchSize: 100
46+
to:
47+
topic: ksml_sensordata_avro
48+
keyType: string
49+
valueType: avro:SensorData
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
2+
3+
# This example shows how to generate data and have it sent to a target topic in a given format.
4+
5+
functions:
6+
generate_sensordata_message:
7+
type: generator
8+
globalCode: |
9+
import time
10+
import random
11+
sensorCounter = 0
12+
code: |
13+
global sensorCounter
14+
15+
key = "sensor"+str(sensorCounter) # Set the key to return ("sensor0" to "sensor9")
16+
sensorCounter = (sensorCounter+1) % 10 # Increase the counter for next iteration
17+
18+
# Generate some random sensor measurement data
19+
types = { 0: { "type": "AREA", "unit": random.choice([ "m2", "ft2" ]), "value": str(random.randrange(1000)) },
20+
1: { "type": "HUMIDITY", "unit": random.choice([ "g/m3", "%" ]), "value": str(random.randrange(100)) },
21+
2: { "type": "LENGTH", "unit": random.choice([ "m", "ft" ]), "value": str(random.randrange(1000)) },
22+
3: { "type": "STATE", "unit": "state", "value": random.choice([ "off", "on" ]) },
23+
4: { "type": "TEMPERATURE", "unit": random.choice([ "C", "F" ]), "value": str(random.randrange(-100, 100)) }
24+
}
25+
26+
# Build the result value using any of the above measurement types
27+
value = { "name": key, "timestamp": str(round(time.time()*1000)), **random.choice(types) }
28+
value["color"] = random.choice([ "black", "blue", "red", "yellow", "white" ])
29+
value["owner"] = random.choice([ "Alice", "Bob", "Charlie", "Dave", "Evan" ])
30+
value["city"] = random.choice([ "Amsterdam", "Xanten", "Utrecht", "Alkmaar", "Leiden" ])
31+
32+
if random.randrange(10) == 0:
33+
key = None
34+
if random.randrange(10) == 0:
35+
value = None
36+
expression: (key, value) # Return a message tuple with the key and value
37+
resultType: (string, json) # Indicate the type of key and value
38+
39+
producers:
40+
# This example uses the otherAvro notation, using the Apicurio Registry API and serializers.
41+
# See the ksml-data-generator.yaml for the notation definition
42+
sensordata_avro_producer_binary:
43+
generator: generate_sensordata_message
44+
interval: 3s
45+
to:
46+
topic: ksml_sensordata_avro_binary
47+
keyType: string
48+
valueType: otherAvro:SensorData

examples/00-example-generate-sensordata.yaml

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
1+
# $schema: https://raw.githubusercontent.com/Axual/ksml/main/docs/ksml-language-spec.json
22

33
# This example shows how to generate data and have it sent to a target topic in a given format.
44

@@ -44,13 +44,3 @@ producers:
4444
topic: ksml_sensordata_avro
4545
keyType: string
4646
valueType: avro:SensorData
47-
48-
# This example uses the otherAvro notation, using the Apicurio Registry API and serializers.
49-
# See the ksml-data-generator.yaml for the notation definition
50-
sensordata_avro_producer_binary:
51-
generator: generate_sensordata_message
52-
interval: 3s
53-
to:
54-
topic: ksml_sensordata_avro_binary
55-
keyType: string
56-
valueType: otherAvro:SensorData

examples/01-example-inspect.yaml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ streams:
88
keyType: string
99
valueType: avro:SensorData
1010
offsetResetPolicy: latest
11-
sensor_source_avro_binary:
12-
topic: ksml_sensordata_avro_binary
13-
keyType: string
14-
valueType: otherAvro:SensorData
1511
sensor_source_csv:
1612
topic: ksml_sensordata_csv
1713
keyType: string
@@ -40,10 +36,6 @@ pipelines:
4036
from: sensor_source_avro
4137
forEach:
4238
code: log_message(key, value, format="AVRO")
43-
consume_avro_binary:
44-
from: sensor_source_avro_binary
45-
forEach:
46-
code: log_message(key, value, format="OTHER_AVRO")
4739
consume_csv:
4840
from: sensor_source_csv
4941
forEach:
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# $schema: https://raw.githubusercontent.com/Axual/ksml/refs/heads/main/docs/ksml-language-spec.json
2+
3+
# This example behaves similarly to example 2 (copy) but includes Python code to measure KSML's performance. It does
4+
# so by storing the message count and startup timestamp in global variables, and outputting log statements every 100
5+
# #messages, containing #messages processed, #seconds running since first message came in, and average #msg/sec.
6+
7+
streams:
8+
sensor_source:
9+
topic: ksml_sensordata_avro
10+
keyType: string
11+
valueType: avro:SensorData
12+
offsetResetPolicy: latest
13+
sensor_copy:
14+
topic: ksml_sensordata_copy
15+
keyType: string
16+
valueType: avro:SensorData
17+
18+
pipelines:
19+
main:
20+
from: sensor_source
21+
via:
22+
# Use a PEEK operation to initialize the global messageCount and startTime
23+
- type: peek
24+
forEach:
25+
globalCode: |
26+
from datetime import datetime
27+
messageCount, startTime = 0, 0
28+
code: |
29+
# Declare global variables, since we are updating them below
30+
global messageCount, startTime
31+
if messageCount == 0:
32+
startTime = datetime.now()
33+
messageCount += 1
34+
# Output performance thus far, done in separate PEEK to allow easy insertion of other operations above
35+
- type: peek
36+
forEach:
37+
code: |
38+
# No need to include the global statement here, since we only read and don't update the global variables
39+
# For every 100 messages that we process, we output a log statement with performance indication so far
40+
if messageCount % 100 == 0:
41+
# Prevent division by zero by using 1 second as minimum
42+
runtime = max(1, (datetime.now() - startTime).total_seconds())
43+
log.warn("Processed {} messages in {} seconds = {} msg/sec", messageCount, runtime, round(messageCount / runtime, 2))
44+
to: sensor_copy

examples/ksml-data-generator.yaml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ ksml:
4242
## Below this line, specify properties to be passed into Confluent's KafkaAvroSerializer and KafkaAvroDeserializer
4343
config:
4444
# The example uses Apicurio, using the Apicurio Confluent Compatibility URL
45-
schema.registry.url: http://schema_registry:8081/apis/ccompat/v7
45+
schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
4646
auto.register.schemas: true
4747
normalize.schemas: true
4848
# Below is an example SSL configuration for Confluent Serialization library
@@ -81,15 +81,16 @@ ksml:
8181
# apicurio.registry.request.ssl.truststore.type: JKS
8282
# apicurio.registry.request.ssl.truststore.password: password
8383

84-
8584
enableProducers: true # Set to true to allow producer definitions to be parsed in the KSML definitions and be executed.
8685
enablePipelines: false # Set to true to allow pipeline definitions to be parsed in the KSML definitions and be executed.
8786

8887
# Section where you specify which KSML definitions to load, parse and execute.
8988
definitions:
9089
# Format is <namespace>: <ksml_definition_filename>
91-
# generate_alert_setting: 00-example-generate-alertsettings.yaml
90+
generate_alert_setting: 00-example-generate-alertsettings.yaml
9291
generate_sensor_data: 00-example-generate-sensordata.yaml
92+
# generate_sensor_data_batch: 00-example-generate-sensordata-batch.yaml
93+
# generate_sensor_data_binary: 00-example-generate-sensordata-binary.yaml
9394

9495
# This setup connects to the Kafka broker and schema registry started with the example docker-compose file
9596
# These examples are intended to run from a inside a container on the same network
@@ -118,12 +119,10 @@ kafka:
118119
# These patterns are resolved into the actual name used on Kafka using the values in this configuration map
119120
# and the topic names specified in the definition YAML files
120121

121-
122122
# tenant: "ksmldemo"
123123
# instance: "dta"
124124
# environment: "dev"
125125
# topic.pattern: "{tenant}-{instance}-{environment}-{topic}"
126126
# # Results in Kafka topic ksmldemo-dta-dev-<topic name from KSML definition YAML>
127127
# group.id.pattern: "{tenant}-{instance}-{environment}-{group.id}"
128128
# transactional.id.pattern: "{tenant}-{instance}-{environment}-{transactional.id}"
129-

examples/ksml-runner.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ ksml:
4242
## Below this line, specify properties to be passed into Confluent's KafkaAvroSerializer and KafkaAvroDeserializer
4343
config:
4444
# Link to Apicurio Confluent Compatibility URL
45-
schema.registry.url: http://schema_registry:8081/apis/ccompat/v7
45+
schema.registry.url: http://schema-registry:8081/apis/ccompat/v7
4646
auto.register.schemas: true
4747
normalize.schemas: true
4848
# Below is an example SSL configuration for Confluent Serialization library
@@ -105,6 +105,7 @@ ksml:
105105
# transform_metadata: 16-example-transform-metadata.yaml
106106
# inspect_with_metrics: 17-example-inspect-with-metrics.yaml
107107
# timestamp_extractor: 18-example-timestamp-extractor.yaml
108+
# performance-measurement: 19-example-performance-measurement.yaml
108109

109110
# This setup connects to the Kafka broker and schema registry started with the example docker-compose file
110111
# These examples are intended to run from a inside a container on the same network
@@ -119,7 +120,6 @@ kafka:
119120
auto.offset.reset: earliest
120121
acks: all
121122

122-
123123
# These are Kafka SSL configuration properties. Check the documentation at1
124124
# Check the documentation at https://kafka.apache.org/documentation/#producerconfigs for more properties
125125

ksml-data-csv/src/main/java/io/axual/ksml/data/notation/csv/CsvSchemaMapper.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import io.axual.ksml.data.mapper.DataSchemaMapper;
2424
import io.axual.ksml.data.object.DataList;
25+
import io.axual.ksml.data.object.DataObject;
2526
import io.axual.ksml.data.schema.DataField;
2627
import io.axual.ksml.data.schema.DataSchema;
2728
import io.axual.ksml.data.schema.DataValue;
@@ -48,7 +49,14 @@ public DataSchema toDataSchema(String namespace, String name, String value) {
4849
private DataSchema toDataSchema(String namespace, String name, DataList fieldNames) {
4950
List<DataField> fields = new ArrayList<>();
5051
for (var fieldName : fieldNames) {
51-
fields.add(new DataField(fieldName.toString(), DataSchema.create(DataSchema.Type.STRING), fieldName.toString(), NO_INDEX, true, false, new DataValue("")));
52+
fields.add(new DataField(
53+
fieldName.toString(DataObject.Printer.INTERNAL),
54+
DataSchema.create(DataSchema.Type.STRING),
55+
fieldName.toString(DataObject.Printer.INTERNAL),
56+
NO_INDEX,
57+
true,
58+
false,
59+
new DataValue("")));
5260
}
5361
return new StructSchema(namespace, name, "CSV schema", fields);
5462
}

ksml-data-xml/src/main/java/io/axual/ksml/data/notation/xml/XmlDataObjectMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ private void elementFromDataObject(ElementCreator elementCreator, Element elemen
230230
return;
231231
}
232232
if (value != null) {
233-
element.setTextContent(value.toString());
233+
element.setTextContent(value.toString(DataObject.Printer.INTERNAL));
234234
}
235235
}
236236
}

ksml-data-xml/src/main/java/io/axual/ksml/data/notation/xml/XmlSchemaMapper.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,22 @@ private DataField parseField(DataStruct fieldStruct) {
170170
var fieldType = fieldStruct.get(ATTRIBUTES_ELEMENT_NAME) instanceof DataStruct attributeStruct ? attributeStruct.get(TYPE_NAME) : null;
171171
if (fieldType instanceof DataString fieldTypeString) {
172172
var type = fieldTypeString.value().contains(":") ? fieldTypeString.value().substring(fieldTypeString.value().indexOf(":") + 1) : fieldTypeString.value();
173-
return simpleField(fieldName.toString(), type);
173+
return simpleField(fieldName.toString(DataObject.Printer.INTERNAL), type);
174174
} else {
175175
// Field type is not specified, so dig down into the elements below to find out the type
176176
var complexTypeElement = fieldStruct.get(COMPLEX_TYPE_NAME);
177177
if (complexTypeElement instanceof DataStruct complexTypeStruct) {
178178
var sequenceElement = complexTypeStruct.get(SEQUENCE_NAME);
179179
if (sequenceElement instanceof DataStruct sequenceStruct) {
180180
var fields = parseFields(sequenceStruct);
181-
return new DataField(fieldName.toString(), new StructSchema(null, fieldName.toString(), "Converted from XSD", fields), null);
181+
return new DataField(
182+
fieldName.toString(DataObject.Printer.INTERNAL),
183+
new StructSchema(
184+
null,
185+
fieldName.toString(DataObject.Printer.INTERNAL),
186+
"Converted from XSD",
187+
fields),
188+
null);
182189
}
183190
}
184191
}

ksml-data/src/main/java/io/axual/ksml/data/notation/NotationLibrary.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public static boolean exists(String notation) {
3939
return notations.containsKey(notation);
4040
}
4141

42-
public static Notation notation(String notation) {
42+
public static Notation get(String notation) {
4343
var result = notation != null ? notations.get(notation) : null;
4444
if (result != null) return result;
4545
throw new DataException("Data notation is not registered in the NotationLibrary: " + (notation != null ? notation : "null"));

ksml-data/src/main/java/io/axual/ksml/data/notation/json/JsonDataObjectConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import io.axual.ksml.data.type.UnionType;
3232

3333
public class JsonDataObjectConverter implements NotationConverter {
34-
private static final JsonDataObjectMapper DATA_OBJECT_MAPPER = new JsonDataObjectMapper();
34+
private static final JsonDataObjectMapper DATA_OBJECT_MAPPER = new JsonDataObjectMapper(false);
3535

3636
@Override
3737
public DataObject convert(DataObject value, UserType targetType) {

ksml-data/src/main/java/io/axual/ksml/data/notation/json/JsonDataObjectMapper.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,21 @@
2828

2929
public class JsonDataObjectMapper implements DataObjectMapper<String> {
3030
private static final NativeDataObjectMapper NATIVE_MAPPER = new NativeDataObjectMapper();
31-
private static final StringMapper<Object> STRING_MAPPER = new JsonStringMapper();
31+
private final StringMapper<Object> stringMapper;
32+
33+
public JsonDataObjectMapper(boolean prettyPrint) {
34+
stringMapper = new JsonStringMapper(prettyPrint);
35+
}
3236

3337
@Override
3438
public DataObject toDataObject(DataType expected, String value) {
35-
var object = STRING_MAPPER.fromString(value);
39+
var object = stringMapper.fromString(value);
3640
return NATIVE_MAPPER.toDataObject(expected, object);
3741
}
3842

3943
@Override
4044
public String fromDataObject(DataObject value) {
4145
var object = NATIVE_MAPPER.fromDataObject(value);
42-
return STRING_MAPPER.toString(object);
46+
return stringMapper.toString(object);
4347
}
4448
}

ksml-data/src/main/java/io/axual/ksml/data/notation/json/JsonSchemaLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
@Slf4j
2828
public class JsonSchemaLoader extends SchemaLoader {
29-
private static final JsonSchemaMapper MAPPER = new JsonSchemaMapper();
29+
private static final JsonSchemaMapper MAPPER = new JsonSchemaMapper(false);
3030

3131
public JsonSchemaLoader(String schemaDirectory) {
3232
super("JSON", schemaDirectory, ".json");

ksml-data/src/main/java/io/axual/ksml/data/notation/json/JsonSchemaMapper.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import static io.axual.ksml.data.schema.DataField.NO_INDEX;
3636

3737
public class JsonSchemaMapper implements DataSchemaMapper<String> {
38-
private static final JsonDataObjectMapper MAPPER = new JsonDataObjectMapper();
3938
private static final String TITLE_NAME = "title";
4039
private static final String DESCRIPTION_NAME = "description";
4140
private static final String TYPE_NAME = "type";
@@ -55,11 +54,16 @@ public class JsonSchemaMapper implements DataSchemaMapper<String> {
5554
private static final String NUMBER_TYPE = "number";
5655
private static final String OBJECT_TYPE = "object";
5756
private static final String STRING_TYPE = "string";
57+
private final JsonDataObjectMapper mapper;
58+
59+
public JsonSchemaMapper(boolean prettyPrint) {
60+
mapper = new JsonDataObjectMapper(prettyPrint);
61+
}
5862

5963
@Override
6064
public DataSchema toDataSchema(String namespace, String name, String value) {
6165
// Convert JSON to internal DataObject format
62-
var schema = MAPPER.toDataObject(value);
66+
var schema = mapper.toDataObject(value);
6367
if (schema instanceof DataStruct schemaStruct) {
6468
return toDataSchema(namespace, name, schemaStruct);
6569
}
@@ -135,7 +139,7 @@ public String fromDataSchema(DataSchema schema) {
135139
final var result = fromDataSchema(structSchema);
136140
// First translate the schema into DataObjects
137141
// The use the mapper to convert it into JSON
138-
return MAPPER.fromDataObject(result);
142+
return mapper.fromDataObject(result);
139143
}
140144
return null;
141145
}

0 commit comments

Comments
 (0)