Skip to content

Commit

Permalink
Fix stream-stream join validation and align other checks
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandisseldorp committed Jan 24, 2024
1 parent d382bcd commit ac5945c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 28 deletions.
25 changes: 17 additions & 8 deletions ksml/src/main/java/io/axual/ksml/operation/JoinOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,16 @@ public StreamWrapper apply(KStreamWrapper input) {
final var v = input.valueType();

if (joinStream instanceof KStreamWrapper otherStream) {
/* Kafka Streams method signature:
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final StreamJoined<K, V, VO> streamJoined);
*/
final var ko = otherStream.keyType();
final var vo = otherStream.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false);
checkType("Join stream keyType", vo, equalTo(k));
checkType("Join stream keyType", ko, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vo));
var joined = StreamJoined.with(k.getSerde(), v.getSerde(), vo.getSerde());
if (name != null) joined = joined.withName(name);
Expand All @@ -100,9 +107,10 @@ public StreamWrapper apply(KStreamWrapper input) {
* final Joined<K, V, VT> joined)
*/

final var kt = otherTable.keyType();
final var vt = otherTable.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vt, v), false);
checkType("Join table keyType", otherTable.keyType(), equalTo(k));
checkType("Join table keyType", kt, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vt));
var joined = Joined.with(k.getSerde(), v.getSerde(), vt.getSerde());
if (name != null) joined = joined.withName(name);
Expand All @@ -123,7 +131,7 @@ public StreamWrapper apply(KStreamWrapper input) {
final var gk = otherGlobalKTable.keyType();
final var gv = otherGlobalKTable.valueType();
final var rv = streamDataTypeOf(firstSpecificType(valueJoiner, gv, v), false);
checkType("Join globalKTable keyType", otherGlobalKTable.keyType(), equalTo(k));
checkType("Join globalKTable keyType", gk, equalTo(k));
checkFunction(KEYSELECTOR_NAME, keyValueMapper, subOf(gk), gk, superOf(k), superOf(v));
checkFunction(VALUEJOINER_NAME, valueJoiner, rv, superOf(v), superOf(gv));
final var output = name != null
Expand All @@ -148,7 +156,7 @@ public StreamWrapper apply(KTableWrapper input) {
final var k = input.keyType();
final var v = input.valueType();

if (joinStream instanceof KTableWrapper kTableWrapper) {
if (joinStream instanceof KTableWrapper otherTable) {
/* Kafka Streams method signature:
* <VO, VR> KTable<K, VR> join(
* final KTable<K, VO> other,
Expand All @@ -157,16 +165,17 @@ public StreamWrapper apply(KTableWrapper input) {
* final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
*/

final var vo = kTableWrapper.valueType();
final var ko = otherTable.keyType();
final var vo = otherTable.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false);
checkType("Join table keyType", kTableWrapper.keyType(), equalTo(k));
checkType("Join table keyType", ko, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, subOf(vr), vr, superOf(v), superOf(vo));
final var kvStore = validateKeyValueStore(store, k, vr);
if (kvStore != null) {
final var mat = materialize(kvStore);
final var output = name != null
? input.table.join(kTableWrapper.table, new UserValueJoiner(valueJoiner), Named.as(name), mat)
: input.table.join(kTableWrapper.table, new UserValueJoiner(valueJoiner), mat);
? input.table.join(otherTable.table, new UserValueJoiner(valueJoiner), Named.as(name), mat)
: input.table.join(otherTable.table, new UserValueJoiner(valueJoiner), mat);
return new KTableWrapper(output, k, vr);
}
}
Expand Down
41 changes: 24 additions & 17 deletions ksml/src/main/java/io/axual/ksml/operation/LeftJoinOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,17 @@ public StreamWrapper apply(KStreamWrapper input) {
final var k = input.keyType();
final var v = input.valueType();

if (joinStream instanceof KStreamWrapper kStreamWrapper) {
final var vo = kStreamWrapper.valueType();
if (joinStream instanceof KStreamWrapper otherStream) {
/* Kafka Streams method signature:
* <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
* final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
* final JoinWindows windows,
* final StreamJoined<K, V, VO> streamJoined);
*/
final var ko = otherStream.keyType();
final var vo = otherStream.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false);
checkType("Join stream keyType", kStreamWrapper.keyType(), equalTo(k));
checkType("Join stream keyType", ko, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vo));
var joined = StreamJoined.with(k.getSerde(), v.getSerde(), vo.getSerde());
if (name != null) joined = joined.withName(name);
Expand All @@ -77,29 +84,29 @@ public StreamWrapper apply(KStreamWrapper input) {
}

final var output = (KStream) input.stream.leftJoin(
kStreamWrapper.stream,
otherStream.stream,
new UserValueJoiner(valueJoiner),
JoinWindows.ofTimeDifferenceWithNoGrace(joinWindowsDuration),
joined);
return new KStreamWrapper(output, k, vr);
}

if (joinStream instanceof KTableWrapper kTableWrapper) {
if (joinStream instanceof KTableWrapper otherTable) {
/* Kafka Streams method signature:
* <VT, VR> KStream<K, VR> leftJoin(
* final KTable<K, VT> table,
* final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
* final Joined<K, V, VT> joined)
*/

final var vt = kTableWrapper.valueType();
final var kt = otherTable.keyType();
final var vt = otherTable.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vt, v), false);
checkType("Join table keyType", kTableWrapper.keyType(), equalTo(k));
checkType("Join table keyType", kt, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vt));
var joined = Joined.with(k.getSerde(), v.getSerde(), vt.getSerde());
if (name != null) joined = joined.withName(name);
final var output = (KStream) input.stream.leftJoin(
kTableWrapper.table,
otherTable.table,
new UserValueJoiner(valueJoiner),
joined);
return new KStreamWrapper(output, k, vr);
Expand All @@ -113,31 +120,31 @@ public StreamWrapper apply(KTableWrapper input) {
final var k = input.keyType();
final var v = input.valueType();

if (joinStream instanceof KTableWrapper kTableWrapper) {
if (joinStream instanceof KTableWrapper otherTable) {
/* Kafka Streams method signature:
* <VO, VR> KTable<K, VR> leftJoin(
* final KTable<K, VO> other,
* final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
* final Named named,
* final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
*/

final var vo = kTableWrapper.valueType();
final var ko = otherTable.keyType();
final var vo = otherTable.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false);
checkType("Join table keyType", kTableWrapper.keyType(), equalTo(k));
checkType("Join table keyType", ko, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, subOf(vr), vr, superOf(v), superOf(vo));
final var kvStore = validateKeyValueStore(store, k, vr);
if (kvStore != null) {
final var mat = materialize(kvStore);
final var output = name != null
? input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner), Named.as(name), mat)
: input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner), mat);
? input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner), Named.as(name), mat)
: input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner), mat);
return new KTableWrapper(output, k, vr);
}

final var output = name != null
? (KTable) input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner), Named.as(name))
: (KTable) input.table.leftJoin(kTableWrapper.table, new UserValueJoiner(valueJoiner));
? (KTable) input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner), Named.as(name))
: (KTable) input.table.leftJoin(otherTable.table, new UserValueJoiner(valueJoiner));
return new KTableWrapper(output, k, vr);
}

Expand Down
13 changes: 10 additions & 3 deletions ksml/src/main/java/io/axual/ksml/operation/OuterJoinOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ public StreamWrapper apply(KStreamWrapper input) {
final var v = input.valueType();

if (joinStream instanceof KStreamWrapper otherStream) {
/* Kafka Streams method signature:
* <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
* final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
* final JoinWindows windows,
* final StreamJoined<K, V, VO> streamJoined);
*/
final var ko = otherStream.keyType();
final var vo = otherStream.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false);
checkType("Join stream keyType", otherStream.keyType().userType().dataType(), equalTo(k));
checkType("Join stream keyType", ko, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, vr, superOf(v), superOf(vo));
var joined = StreamJoined.with(k.getSerde(), v.getSerde(), vo.getSerde());
if (name != null) joined = joined.withName(name);
Expand Down Expand Up @@ -88,11 +95,11 @@ public StreamWrapper apply(KTableWrapper input) {
* final Named named,
* final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
*/

checkNotNull(valueJoiner, VALUEJOINER_NAME.toLowerCase());
final var ko = otherTable.keyType();
final var vo = otherTable.valueType();
final var vr = streamDataTypeOf(firstSpecificType(valueJoiner, vo, v), false);
checkType("Join table keyType", otherTable.keyType().userType(), equalTo(k));
checkType("Join table keyType", ko, equalTo(k));
checkFunction(VALUEJOINER_NAME, valueJoiner, subOf(vr), vr, superOf(v), superOf(vo));
final var kvStore = validateKeyValueStore(store, k, vr);

Expand Down

0 comments on commit ac5945c

Please sign in to comment.