Streams API
namely TimestampedKeyQuery
and TimestampedRangeQuery
. Both should be used to query a timestamped key-value store, to retrieve a ValueAndTimestamp
result.
The existing KeyQuery
and RangeQuery
are changed to always return the value only for timestamped key-value stores.
-
The non-null key requirements for Kafka Streams join operators were relaxed as part of KIP-962.
The behavior of the following operators changed.
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java
index ab0a11b94f03f..cd291638bcbdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java
@@ -44,13 +44,12 @@ public final class TimestampedRangeQuery implements Query lower;
private final Optional upper;
+ private final ResultOrder order;
- private final boolean isKeyAscending;
-
- private TimestampedRangeQuery(final Optional lower, final Optional upper, final boolean isKeyAscending) {
+ private TimestampedRangeQuery(final Optional lower, final Optional upper, final ResultOrder order) {
this.lower = lower;
this.upper = upper;
- this.isKeyAscending = isKeyAscending;
+ this.order = order;
}
/**
@@ -61,7 +60,7 @@ private TimestampedRangeQuery(final Optional lower, final Optional upper,
* @param The value type
*/
public static TimestampedRangeQuery withRange(final K lower, final K upper) {
- return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true);
+ return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY);
}
/**
@@ -72,7 +71,7 @@ public static TimestampedRangeQuery withRange(final K lower, final
* @param The value type
*/
public static TimestampedRangeQuery withUpperBound(final K upper) {
- return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), true);
+ return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), ResultOrder.ANY);
}
/**
@@ -82,16 +81,16 @@ public static TimestampedRangeQuery withUpperBound(final K upper) {
* @param The value type
*/
public static TimestampedRangeQuery withLowerBound(final K lower) {
- return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), true);
+ return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), ResultOrder.ANY);
}
/**
- * Determines if the serialized byte[] of the keys in ascending order.
+ * Determines if the serialized byte[] of the keys in ascending or descending or unordered order.
* Order is based on the serialized byte[] of the keys, not the 'logical' key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of return records base on the serialized byte[] of the keys (can be unordered, or in ascending, or in descending order).
*/
- public boolean isKeyAscending() {
- return isKeyAscending;
+ public ResultOrder resultOrder() {
+ return order;
}
/**
@@ -100,9 +99,17 @@ public boolean isKeyAscending() {
* @return a new RangeQuery instance with descending flag set.
*/
public TimestampedRangeQuery withDescendingKeys() {
- return new TimestampedRangeQuery<>(this.lower, this.upper, false);
+ return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.DESCENDING);
}
+ /**
+ * Set the query to return the serialized byte[] of the keys in ascending order.
+ * Order is based on the serialized byte[] of the keys, not the 'logical' key order.
+ * @return a new RangeQuery instance with ascending flag set.
+ */
+ public TimestampedRangeQuery withAscendingKeys() {
+ return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.ASCENDING);
+ }
/**
* Interactive scan query that returns all records in the store.
@@ -110,7 +117,7 @@ public TimestampedRangeQuery withDescendingKeys() {
* @param The value type
*/
public static TimestampedRangeQuery withNoBounds() {
- return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), true);
+ return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), ResultOrder.ANY);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index bf4ba871b29b4..b5c774fef3c8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -206,14 +206,17 @@ private QueryResult runTimestampedRangeQuery(final Query query,
final QueryResult result;
final TimestampedRangeQuery typedQuery = (TimestampedRangeQuery) query;
RangeQuery rawRangeQuery;
- final boolean isKeyAscending = typedQuery.isKeyAscending();
+ final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.lowerBound().orElse(null)),
keyBytes(typedQuery.upperBound().orElse(null))
);
- if (!isKeyAscending) {
+ if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
}
+ if (order.equals(ResultOrder.ASCENDING)) {
+ rawRangeQuery = rawRangeQuery.withAscendingKeys();
+ }
final QueryResult> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {