Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve example 12 byte manipulation #154

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/00-example-generate-sensordata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,13 @@ producers:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData

# The following producer generates messages for 12-example-byte-manipulation.yaml. Enable this producer if you
# want to run that specific example.
sensordata_avro_producer_binary:
generator: generate_sensordata_message
interval: 3s
to:
topic: ksml_sensordata_avro_binary
keyType: string
valueType: avro:SensorData
12 changes: 6 additions & 6 deletions examples/12-example-byte-manipulation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
# message contains a schema id that is not locally recognized. By changing bytes 1-4 in the value,
# one can override the schema id for further downstream processing.

# Yes, this is hacky, but it may serve a purpose for cases where binary copies are made from
# remote Kafka clusters that contain conflicting schema ids.
# Yes, this is hacky, but it may serve a purpose for cases where binary message copies are made from
# remote Kafka clusters with their own (possibly conflicting) schema ids.

streams:
sensor_source:
topic: ksml_sensordata_avro
sensor_binary_source:
topic: ksml_sensordata_avro_binary
keyType: string
valueType: bytes
offsetResetPolicy: latest
Expand All @@ -30,7 +30,7 @@ functions:
code: |
global newSchemaId
log.info("Replacing schema in message value: {}", value)
if value is not None:
if isinstance(value, list):
if value[0] == 0 and len(value) >= 5:
value[1] = (newSchemaId & 0xff000000) >> 24
value[2] = (newSchemaId & 0xff0000) >> 16
Expand All @@ -44,7 +44,7 @@ functions:

pipelines:
main:
from: sensor_source
from: sensor_binary_source
via:
- type: peek
forEach:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.axual.ksml.util.ExecutionUtil;
import org.graalvm.polyglot.Value;

import java.util.ArrayList;
import java.util.Map;

public class PythonDataObjectMapper extends NativeDataObjectMapper {
Expand Down Expand Up @@ -146,7 +147,13 @@ public Value fromDataObject(DataObject object) {
if (object instanceof DataLong val) return Value.asValue(val.value());
if (object instanceof DataFloat val) return Value.asValue(val.value());
if (object instanceof DataDouble val) return Value.asValue(val.value());
if (object instanceof DataBytes val) return Value.asValue(val.value());
if (object instanceof DataBytes val) {
// Convert the contained byte array to a list, so it can be converted to a Python list by the PythonFunction
// wrapper code downstream...
final var bytes = new ArrayList<Byte>(val.value().length);
for (byte b : val.value()) bytes.add(b);
return Value.asValue(bytes);
}
if (object instanceof DataString val) return Value.asValue(val.value());
if (object instanceof DataList val) return Value.asValue(fromDataList(val));
if (object instanceof DataStruct val) return Value.asValue(fromDataStruct(val));
Expand Down