Skip to content

Commit

Permalink
Fix for transformation from key value to value list
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandisseldorp committed Jul 26, 2024
1 parent 48ecbc5 commit 3744773
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.axual.ksml.data.type.DataType;
import io.axual.ksml.data.type.ListType;
import io.axual.ksml.definition.FunctionDefinition;
import io.axual.ksml.exception.TopologyException;
import io.axual.ksml.generator.TopologyBuildContext;
import io.axual.ksml.operation.processor.FixedKeyOperationProcessorSupplier;
import io.axual.ksml.operation.processor.TransformKeyValueToValueListProcessor;
Expand Down Expand Up @@ -52,8 +53,12 @@ public StreamWrapper apply(KStreamWrapper input, TopologyBuildContext context) {
checkNotNull(mapper, MAPPER_NAME.toLowerCase());
final var k = input.keyType();
final var v = input.valueType();
final var vr = streamDataTypeOf(firstSpecificType(mapper, new UserType(new ListType(DataType.UNKNOWN))), false);
final var map = userFunctionOf(context, MAPPER_NAME, mapper, subOf(vr), superOf(k.flatten()), superOf(v.flatten()));
final var vrs = streamDataTypeOf(firstSpecificType(mapper, new UserType(new ListType(DataType.UNKNOWN))), false);
if (!(vrs.userType().dataType() instanceof ListType vrList)) {
throw new TopologyException("Function should return a list of values, but currently returns " + vrs);
}
final var vr = streamDataTypeOf(vrList.valueType(), false);
final var map = userFunctionOf(context, MAPPER_NAME, mapper, subOf(vrs), superOf(k.flatten()), superOf(v.flatten()));
final var userMap = new UserKeyValueToValueListTransformer(map, tags);
final var storeNames = mapper.storeNames().toArray(String[]::new);
final var supplier = new FixedKeyOperationProcessorSupplier<>(
Expand Down
4 changes: 2 additions & 2 deletions ksml/src/main/java/io/axual/ksml/python/PythonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public PythonContext() {
.allowHostSocketAccess(false)
.build())
// .allowIO(IOAccess.ALL)
.allowNativeAccess(false)
// .allowNativeAccess(true)
// .allowNativeAccess(false)
.allowNativeAccess(true)
.allowPolyglotAccess(
PolyglotAccess.newBuilder()
.allowBindingsAccess(PYTHON)
Expand Down

0 comments on commit 3744773

Please sign in to comment.