From 5ba7bfaa5786294b1522be6e85e7da64b0b09590 Mon Sep 17 00:00:00 2001
From: Hanyu Zheng <135176127+hanyuzheng7@users.noreply.github.com>
Date: Thu, 7 Dec 2023 15:29:29 -0800
Subject: [PATCH] KAFKA-15629: Support ResultOrder to TimestampedRangeQuery.
(#14907)
Update to KIP-992.
Reviewers: Matthias J. Sax
---
docs/streams/upgrade-guide.html | 1 -
.../streams/query/TimestampedRangeQuery.java | 33 +++++++++++--------
.../MeteredTimestampedKeyValueStore.java | 7 ++--
3 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index f73230a3c46da..fd3b4573ad411 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -143,7 +143,6 @@
-
The non-null key requirements for Kafka Streams join operators were relaxed as part of KIP-962.
The behavior of the following operators changed.
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java
index ab0a11b94f03f..cd291638bcbdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java
@@ -44,13 +44,12 @@ public final class TimestampedRangeQuery implements Query lower;
private final Optional upper;
+ private final ResultOrder order;
- private final boolean isKeyAscending;
-
- private TimestampedRangeQuery(final Optional lower, final Optional upper, final boolean isKeyAscending) {
+ private TimestampedRangeQuery(final Optional lower, final Optional upper, final ResultOrder order) {
this.lower = lower;
this.upper = upper;
- this.isKeyAscending = isKeyAscending;
+ this.order = order;
}
/**
@@ -61,7 +60,7 @@ private TimestampedRangeQuery(final Optional lower, final Optional upper,
* @param The value type
*/
public static TimestampedRangeQuery withRange(final K lower, final K upper) {
- return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true);
+ return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY);
}
/**
@@ -72,7 +71,7 @@ public static TimestampedRangeQuery withRange(final K lower, final
* @param The value type
*/
public static TimestampedRangeQuery withUpperBound(final K upper) {
- return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), true);
+ return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), ResultOrder.ANY);
}
/**
@@ -82,16 +81,16 @@ public static TimestampedRangeQuery withUpperBound(final K upper) {
* @param The value type
*/
public static TimestampedRangeQuery withLowerBound(final K lower) {
- return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), true);
+ return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), ResultOrder.ANY);
}
/**
- * Determines if the serialized byte[] of the keys in ascending order.
+ * Determines if the serialized byte[] of the keys in ascending or descending or unordered order.
* Order is based on the serialized byte[] of the keys, not the 'logical' key order.
- * @return true if ascending, false otherwise.
+ * @return return the order of return records base on the serialized byte[] of the keys (can be unordered, or in ascending, or in descending order).
*/
- public boolean isKeyAscending() {
- return isKeyAscending;
+ public ResultOrder resultOrder() {
+ return order;
}
/**
@@ -100,9 +99,17 @@ public boolean isKeyAscending() {
* @return a new RangeQuery instance with descending flag set.
*/
public TimestampedRangeQuery withDescendingKeys() {
- return new TimestampedRangeQuery<>(this.lower, this.upper, false);
+ return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.DESCENDING);
}
+ /**
+ * Set the query to return the serialized byte[] of the keys in ascending order.
+ * Order is based on the serialized byte[] of the keys, not the 'logical' key order.
+ * @return a new RangeQuery instance with ascending flag set.
+ */
+ public TimestampedRangeQuery withAscendingKeys() {
+ return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.ASCENDING);
+ }
/**
* Interactive scan query that returns all records in the store.
@@ -110,7 +117,7 @@ public TimestampedRangeQuery withDescendingKeys() {
* @param The value type
*/
public static TimestampedRangeQuery withNoBounds() {
- return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), true);
+ return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), ResultOrder.ANY);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index bf4ba871b29b4..b5c774fef3c8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -206,14 +206,17 @@ private QueryResult runTimestampedRangeQuery(final Query query,
final QueryResult result;
final TimestampedRangeQuery typedQuery = (TimestampedRangeQuery) query;
RangeQuery rawRangeQuery;
- final boolean isKeyAscending = typedQuery.isKeyAscending();
+ final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.lowerBound().orElse(null)),
keyBytes(typedQuery.upperBound().orElse(null))
);
- if (!isKeyAscending) {
+ if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
}
+ if (order.equals(ResultOrder.ASCENDING)) {
+ rawRangeQuery = rawRangeQuery.withAscendingKeys();
+ }
final QueryResult> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {