diff --git a/ksml/src/main/java/io/axual/ksml/operation/JoinOperation.java b/ksml/src/main/java/io/axual/ksml/operation/JoinOperation.java index ad3f5036..b24d109d 100644 --- a/ksml/src/main/java/io/axual/ksml/operation/JoinOperation.java +++ b/ksml/src/main/java/io/axual/ksml/operation/JoinOperation.java @@ -77,9 +77,16 @@ public StreamWrapper apply(KStreamWrapper input) { final var v = input.valueType(); if (joinStream instanceof KStreamWrapper otherStream) { + /* Kafka Streams method signature: + KStream join(final KStream otherStream, + final ValueJoiner joiner, + final JoinWindows windows, + final StreamJoined streamJoined); + */ + final var ko = otherStream.keyType(); final var vo = otherStream.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false); - checkType("Join stream keyType", vo, equalTo(k)); + checkType("Join stream keyType", ko, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vo)); var joined = StreamJoined.with(k.getSerde(), v.getSerde(), vo.getSerde()); if (name != null) joined = joined.withName(name); @@ -100,9 +107,10 @@ public StreamWrapper apply(KStreamWrapper input) { * final Joined joined) */ + final var kt = otherTable.keyType(); final var vt = otherTable.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vt, v), false); - checkType("Join table keyType", otherTable.keyType(), equalTo(k)); + checkType("Join table keyType", kt, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vt)); var joined = Joined.with(k.getSerde(), v.getSerde(), vt.getSerde()); if (name != null) joined = joined.withName(name); @@ -123,7 +131,7 @@ public StreamWrapper apply(KStreamWrapper input) { final var gk = otherGlobalKTable.keyType(); final var gv = otherGlobalKTable.valueType(); final var rv = streamDataTypeOf(firstSpecificType(valueJoiner, gv, v), false); - checkType("Join globalKTable keyType", otherGlobalKTable.keyType(), equalTo(k)); + checkType("Join globalKTable keyType", gk, equalTo(k)); checkFunction(KEYSELECTOR_NAME, keyValueMapper, subOf(gk), gk, superOf(k), superOf(v)); checkFunction(VALUEJOINER_NAME, valueJoiner, rv, superOf(v), superOf(gv)); final var output = name != null @@ -148,7 +156,7 @@ public StreamWrapper apply(KTableWrapper input) { final var k = input.keyType(); final var v = input.valueType(); - if (joinStream instanceof KTableWrapper kTableWrapper) { + if (joinStream instanceof KTableWrapper otherTable) { /* Kafka Streams method signature: * KTable join( * final KTable other, @@ -157,16 +165,17 @@ public StreamWrapper apply(KTableWrapper input) { * final Materialized> materialized) */ - final var vo = kTableWrapper.valueType(); + final var ko = otherTable.keyType(); + final var vo = otherTable.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false); - checkType("Join table keyType", kTableWrapper.keyType(), equalTo(k)); + checkType("Join table keyType", ko, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, subOf(vr), vr, superOf(v), superOf(vo)); final var kvStore = validateKeyValueStore(store, k, vr); if (kvStore != null) { final var mat = materialize(kvStore); final var output = name != null - ? input.table.join(kTableWrapper.table, new UserValueJoiner(valueJoiner), Named.as(name), mat) - : input.table.join(kTableWrapper.table, new UserValueJoiner(valueJoiner), mat); + ? input.table.join(otherTable.table, new UserValueJoiner(valueJoiner), Named.as(name), mat) + : input.table.join(otherTable.table, new UserValueJoiner(valueJoiner), mat); return new KTableWrapper(output, k, vr); } } diff --git a/ksml/src/main/java/io/axual/ksml/operation/LeftJoinOperation.java b/ksml/src/main/java/io/axual/ksml/operation/LeftJoinOperation.java index ecc20b3c..0aaef971 100644 --- a/ksml/src/main/java/io/axual/ksml/operation/LeftJoinOperation.java +++ b/ksml/src/main/java/io/axual/ksml/operation/LeftJoinOperation.java @@ -64,10 +64,17 @@ public StreamWrapper apply(KStreamWrapper input) { final var k = input.keyType(); final var v = input.valueType(); - if (joinStream instanceof KStreamWrapper kStreamWrapper) { - final var vo = kStreamWrapper.valueType(); + if (joinStream instanceof KStreamWrapper otherStream) { + /* Kafka Streams method signature: + * KStream leftJoin(final KStream otherStream, + * final ValueJoiner joiner, + * final JoinWindows windows, + * final StreamJoined streamJoined); + */ + final var ko = otherStream.keyType(); + final var vo = otherStream.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false); - checkType("Join stream keyType", kStreamWrapper.keyType(), equalTo(k)); + checkType("Join stream keyType", ko, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vo)); var joined = StreamJoined.with(k.getSerde(), v.getSerde(), vo.getSerde()); if (name != null) joined = joined.withName(name); @@ -77,29 +84,29 @@ public StreamWrapper apply(KStreamWrapper input) { } final var output = (KStream) input.stream.leftJoin( - kStreamWrapper.stream, + otherStream.stream, new UserValueJoiner(valueJoiner), JoinWindows.ofTimeDifferenceWithNoGrace(joinWindowsDuration), joined); return new KStreamWrapper(output, k, vr); } - if (joinStream instanceof KTableWrapper kTableWrapper) { + if (joinStream instanceof KTableWrapper otherTable) { /* Kafka Streams method signature: * KStream leftJoin( * final KTable table, * final ValueJoiner joiner, * final Joined joined) */ - - final var vt = kTableWrapper.valueType(); + final var kt = otherTable.keyType(); + final var vt = otherTable.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vt, v), false); - checkType("Join table keyType", kTableWrapper.keyType(), equalTo(k)); + checkType("Join table keyType", kt, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vt)); var joined = Joined.with(k.getSerde(), v.getSerde(), vt.getSerde()); if (name != null) joined = joined.withName(name); final var output = (KStream) input.stream.leftJoin( - kTableWrapper.table, + otherTable.table, new UserValueJoiner(valueJoiner), joined); return new KStreamWrapper(output, k, vr); @@ -113,7 +120,7 @@ public StreamWrapper apply(KTableWrapper input) { final var k = input.keyType(); final var v = input.valueType(); - if (joinStream instanceof KTableWrapper kTableWrapper) { + if (joinStream instanceof KTableWrapper otherTable) { /* Kafka Streams method signature: * KTable leftJoin( * final KTable other, @@ -121,23 +128,23 @@ public StreamWrapper apply(KTableWrapper input) { * final Named named, * final Materialized> materialized) */ - - final var vo = kTableWrapper.valueType(); + final var ko = otherTable.keyType(); + final var vo = otherTable.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false); - checkType("Join table keyType", kTableWrapper.keyType(), equalTo(k)); + checkType("Join table keyType", ko, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, subOf(vr), vr, superOf(v), superOf(vo)); final var kvStore = validateKeyValueStore(store, k, vr); if (kvStore != null) { final var mat = materialize(kvStore); final var output = name != null - ? input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner), Named.as(name), mat) - : input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner), mat); + ? input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner), Named.as(name), mat) + : input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner), mat); return new KTableWrapper(output, k, vr); } final var output = name != null - ? (KTable) input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner), Named.as(name)) - : (KTable) input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner)); + ? (KTable) input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner), Named.as(name)) + : (KTable) input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner)); return new KTableWrapper(output, k, vr); } diff --git a/ksml/src/main/java/io/axual/ksml/operation/OuterJoinOperation.java b/ksml/src/main/java/io/axual/ksml/operation/OuterJoinOperation.java index cf4aed66..924331df 100644 --- a/ksml/src/main/java/io/axual/ksml/operation/OuterJoinOperation.java +++ b/ksml/src/main/java/io/axual/ksml/operation/OuterJoinOperation.java @@ -57,9 +57,16 @@ public StreamWrapper apply(KStreamWrapper input) { final var v = input.valueType(); if (joinStream instanceof KStreamWrapper otherStream) { + /* Kafka Streams method signature: + * KStream outerJoin(final KStream otherStream, + * final ValueJoiner joiner, + * final JoinWindows windows, + * final StreamJoined streamJoined); + */ + final var ko = otherStream.keyType(); final var vo = otherStream.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false); - checkType("Join stream keyType", otherStream.keyType().userType().dataType(), equalTo(k)); + checkType("Join stream keyType", ko, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vo)); var joined = StreamJoined.with(k.getSerde(), v.getSerde(), vo.getSerde()); if (name != null) joined = joined.withName(name); @@ -88,11 +95,11 @@ public StreamWrapper apply(KTableWrapper input) { * final Named named, * final Materialized> materialized) */ - checkNotNull(valueJoiner, VALUEJOINER_NAME.toLowerCase()); + final var ko = otherTable.keyType(); final var vo = otherTable.valueType(); final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false); - checkType("Join table keyType", otherTable.keyType().userType(), equalTo(k)); + checkType("Join table keyType", ko, equalTo(k)); checkFunction(VALUEJOINER_NAME, valueJoiner, subOf(vr), vr, superOf(v), superOf(vo)); final var kvStore = validateKeyValueStore(store, k, vr);