Skip to content

Commit

Permalink
KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. (apache#14907
Browse files Browse the repository at this point in the history
)

Update to KIP-992.

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
hanyuzheng7 authored Dec 7, 2023
1 parent 02915a2 commit 5ba7bfa
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
1 change: 0 additions & 1 deletion docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ <h3><a id="streams_api_changes_370" href="#streams_api_changes_370">Streams API
namely <code>TimestampedKeyQuery</code> and <code>TimestampedRangeQuery</code>. Both should be used to query a timestamped key-value store, to retrieve a <code>ValueAndTimestamp</code> result.
The existing <code>KeyQuery</code> and <code>RangeQuery</code> are changed to always return the value only for timestamped key-value stores.
</p>

<p>
The non-null key requirements for Kafka Streams join operators were relaxed as part of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams">KIP-962</a>.
The behavior of the following operators changed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ public final class TimestampedRangeQuery<K, V> implements Query<KeyValueIterator

private final Optional<K> lower;
private final Optional<K> upper;
private final ResultOrder order;

private final boolean isKeyAscending;

private TimestampedRangeQuery(final Optional<K> lower, final Optional<K> upper, final boolean isKeyAscending) {
private TimestampedRangeQuery(final Optional<K> lower, final Optional<K> upper, final ResultOrder order) {
this.lower = lower;
this.upper = upper;
this.isKeyAscending = isKeyAscending;
this.order = order;
}

/**
Expand All @@ -61,7 +60,7 @@ private TimestampedRangeQuery(final Optional<K> lower, final Optional<K> upper,
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withRange(final K lower, final K upper) {
return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true);
return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY);
}

/**
Expand All @@ -72,7 +71,7 @@ public static <K, V> TimestampedRangeQuery<K, V> withRange(final K lower, final
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withUpperBound(final K upper) {
return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), true);
return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), ResultOrder.ANY);
}

/**
Expand All @@ -82,16 +81,16 @@ public static <K, V> TimestampedRangeQuery<K, V> withUpperBound(final K upper) {
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withLowerBound(final K lower) {
return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), true);
return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), ResultOrder.ANY);
}

/**
* Determines if the serialized byte[] of the keys in ascending order.
* Determines if the serialized byte[] of the keys in ascending or descending or unordered order.
* Order is based on the serialized byte[] of the keys, not the 'logical' key order.
* @return true if ascending, false otherwise.
* @return return the order of return records base on the serialized byte[] of the keys (can be unordered, or in ascending, or in descending order).
*/
public boolean isKeyAscending() {
return isKeyAscending;
public ResultOrder resultOrder() {
return order;
}

/**
Expand All @@ -100,17 +99,25 @@ public boolean isKeyAscending() {
* @return a new RangeQuery instance with descending flag set.
*/
public TimestampedRangeQuery<K, V> withDescendingKeys() {
return new TimestampedRangeQuery<>(this.lower, this.upper, false);
return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.DESCENDING);
}

/**
* Set the query to return the serialized byte[] of the keys in ascending order.
* Order is based on the serialized byte[] of the keys, not the 'logical' key order.
* @return a new RangeQuery instance with ascending flag set.
*/
public TimestampedRangeQuery<K, V> withAscendingKeys() {
return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.ASCENDING);
}

/**
* Interactive scan query that returns all records in the store.
* @param <K> The key type
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withNoBounds() {
return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), true);
return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), ResultOrder.ANY);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,17 @@ private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
final QueryResult<R> result;
final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query;
RangeQuery<Bytes, byte[]> rawRangeQuery;
final boolean isKeyAscending = typedQuery.isKeyAscending();
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.lowerBound().orElse(null)),
keyBytes(typedQuery.upperBound().orElse(null))
);
if (!isKeyAscending) {
if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
}
if (order.equals(ResultOrder.ASCENDING)) {
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
Expand Down

0 comments on commit 5ba7bfa

Please sign in to comment.