diff --git a/quixstreams/dataframe/windows/sliding.py b/quixstreams/dataframe/windows/sliding.py index 3aac82b7e..b88ce2f54 100644 --- a/quixstreams/dataframe/windows/sliding.py +++ b/quixstreams/dataframe/windows/sliding.py @@ -65,7 +65,7 @@ def process_window( # Sliding windows are inclusive on both ends, so values with # timestamps equal to latest_timestamp - duration - grace # are still eligible for processing. - state_ts = state.get_highest_id() or 0 + state_ts = state.get_latest_timestamp() or 0 latest_timestamp = max(timestamp_ms, state_ts) max_expired_window_end = latest_timestamp - grace - 1 max_expired_window_start = max_expired_window_end - duration diff --git a/quixstreams/dataframe/windows/time_based.py b/quixstreams/dataframe/windows/time_based.py index 7da3b64ae..27f1e6ada 100644 --- a/quixstreams/dataframe/windows/time_based.py +++ b/quixstreams/dataframe/windows/time_based.py @@ -68,7 +68,7 @@ def process_window( step_ms=self._step_ms, ) - state_ts = state.get_highest_id() or 0 + state_ts = state.get_latest_timestamp() or 0 latest_timestamp = max(timestamp_ms, state_ts) max_expired_window_end = latest_timestamp - grace_ms max_expired_window_start = max_expired_window_end - duration_ms diff --git a/quixstreams/state/rocksdb/windowed/state.py b/quixstreams/state/rocksdb/windowed/state.py index 3e99d3b42..543133a41 100644 --- a/quixstreams/state/rocksdb/windowed/state.py +++ b/quixstreams/state/rocksdb/windowed/state.py @@ -105,16 +105,16 @@ def delete_from_collection(self, end: int) -> None: """ return self._transaction.delete_from_collection(end=end, prefix=self._prefix) - def get_highest_id(self) -> Optional[int]: + def get_latest_timestamp(self) -> Optional[int]: """ - Get the highest observed message ID for the current message key. + Get the latest observed timestamp for the current message key. - Use this ID to determine if the arriving event is late and should be + Use this timestamp to determine if the arriving event is late and should be discarded from the processing. - :return: latest observed event ID + :return: latest observed timestamp """ - return self._transaction.get_highest_id(prefix=self._prefix) + return self._transaction.get_latest_timestamp(prefix=self._prefix) def expire_windows( self, diff --git a/quixstreams/state/rocksdb/windowed/transaction.py b/quixstreams/state/rocksdb/windowed/transaction.py index 897c8c036..11cb28562 100644 --- a/quixstreams/state/rocksdb/windowed/transaction.py +++ b/quixstreams/state/rocksdb/windowed/transaction.py @@ -98,8 +98,8 @@ def as_state(self, prefix: Any = DEFAULT_PREFIX) -> WindowedTransactionState: # ), ) - def get_highest_id(self, prefix: bytes) -> Optional[int]: - return self._get_highest_id( + def get_latest_timestamp(self, prefix: bytes) -> Optional[int]: + return self._get_latest_timestamp( prefix=prefix, cache=self._latest_timestamps, default=0 ) @@ -128,17 +128,17 @@ def update_window( key = encode_integer_pair(start_ms, end_ms) self.set(key=key, value=value, prefix=prefix) - latest_timestamp_ms = self.get_highest_id(prefix=prefix) + latest_timestamp_ms = self.get_latest_timestamp(prefix=prefix) updated_timestamp_ms = ( max(latest_timestamp_ms, timestamp_ms) if latest_timestamp_ms is not None else timestamp_ms ) - self._set_highest_id( + self._set_latest_timestamp( cache=self._latest_timestamps, prefix=prefix, - id=updated_timestamp_ms, + timestamp_ms=updated_timestamp_ms, ) def add_to_collection( @@ -164,7 +164,7 @@ def get_from_collection(self, start: int, end: int, prefix: bytes) -> list[Any]: def delete_from_collection(self, end: int, prefix: bytes) -> None: start = ( - self._get_highest_id( + self._get_latest_timestamp( cache=self._last_deleted_value_timestamps, prefix=prefix ) or -1 @@ -180,10 +180,10 @@ def delete_from_collection(self, end: int, prefix: bytes) -> None: self.delete(key=key, prefix=prefix, cf_name=VALUES_CF_NAME) if last_deleted_id is not None: - self._set_highest_id( + self._set_latest_timestamp( cache=self._last_deleted_value_timestamps, prefix=prefix, - id=last_deleted_id, + timestamp_ms=last_deleted_id, ) def delete_window(self, start_ms: int, end_ms: int, prefix: bytes): @@ -233,7 +233,7 @@ def expire_windows( start_from = -1 # Find the latest start timestamp of the expired windows for the given key - last_expired = self._get_highest_id( + last_expired = self._get_latest_timestamp( cache=self._last_expired_timestamps, prefix=prefix ) if last_expired is not None: @@ -253,10 +253,10 @@ def expire_windows( latest_window = expired_windows[-1] last_expired__gt = latest_window[0][0] - self._set_highest_id( + self._set_latest_timestamp( cache=self._last_expired_timestamps, prefix=prefix, - id=last_expired__gt, + timestamp_ms=last_expired__gt, ) # Collect values into windows @@ -318,7 +318,7 @@ def delete_windows( start_from = -1 # Find the latest start timestamp of the deleted windows for the given key - last_deleted = self._get_highest_id( + last_deleted = self._get_latest_timestamp( cache=self._last_deleted_window_timestamps, prefix=prefix ) if last_deleted is not None: @@ -337,10 +337,10 @@ def delete_windows( # Save the start of the latest deleted window to the deletion index if last_deleted__gt: - self._set_highest_id( + self._set_latest_timestamp( cache=self._last_deleted_window_timestamps, prefix=prefix, - id=last_deleted__gt, + timestamp_ms=last_deleted__gt, ) if delete_values: @@ -445,7 +445,7 @@ def _get_items( # Sort and deserialize items merged from the cache and store return sorted(merged_items.items(), key=lambda kv: kv[0], reverse=backwards) - def _get_highest_id( + def _get_latest_timestamp( self, cache: TimestampsCache, prefix: bytes, default: Any = None ) -> Optional[int]: cached_ts = cache.timestamps.get(prefix) @@ -464,11 +464,13 @@ def _get_highest_id( cache.timestamps[prefix] = stored_ts return stored_ts - def _set_highest_id(self, cache: TimestampsCache, prefix: bytes, id: int): - cache.timestamps[prefix] = id + def _set_latest_timestamp( + self, cache: TimestampsCache, prefix: bytes, timestamp_ms: int + ): + cache.timestamps[prefix] = timestamp_ms self.set( key=cache.key, - value=id, + value=timestamp_ms, prefix=prefix, cf_name=cache.cf_name, ) diff --git a/quixstreams/state/types.py b/quixstreams/state/types.py index 52374d69e..54ba57001 100644 --- a/quixstreams/state/types.py +++ b/quixstreams/state/types.py @@ -116,7 +116,7 @@ def delete_from_collection(self, end: int) -> None: """ ... - def get_highest_id(self) -> Optional[int]: + def get_latest_timestamp(self) -> Optional[int]: """ Get the latest observed message ID for the current state prefix (same as message key). @@ -300,15 +300,14 @@ def delete_from_collection(self, end: int) -> None: """ ... - def get_highest_id(self, prefix: bytes) -> int: + def get_latest_timestamp(self, prefix: bytes) -> int: """ - Get the latest observed message ID for the current state prefix - (same as message key). + Get the latest observed timestamp for the current message key. - Use this ID to determine if the arriving event is late and should be + Use this timestamp to determine if the arriving event is late and should be discarded from the processing. - :return: latest observed event ID + :return: latest observed timestamp """ ... diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_state.py b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_state.py index e5cb6e00e..ab4af0fea 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_state.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_state.py @@ -59,7 +59,7 @@ def test_expire_windows(transaction_state, delete): with transaction_state() as state: state.update_window(start_ms=20, end_ms=30, value=3, timestamp_ms=20) - max_start_time = state.get_highest_id() - duration_ms + max_start_time = state.get_latest_timestamp() - duration_ms expired = state.expire_windows(max_start_time=max_start_time, delete=delete) # "expire_windows" must update the expiration index so that the same # windows are not expired twice @@ -96,7 +96,7 @@ def test_expire_windows_with_collect(transaction_state, end_inclusive): with transaction_state() as state: state.update_window(start_ms=20, end_ms=30, value=None, timestamp_ms=20) - max_start_time = state.get_highest_id() - duration_ms + max_start_time = state.get_latest_timestamp() - duration_ms expired = state.expire_windows( max_start_time=max_start_time, collect=True, @@ -122,14 +122,14 @@ def test_same_keys_in_db_and_update_cache(transaction_state): state.update_window(start_ms=0, end_ms=10, value=3, timestamp_ms=8) state.update_window(start_ms=10, end_ms=20, value=2, timestamp_ms=10) - max_start_time = state.get_highest_id() - duration_ms + max_start_time = state.get_latest_timestamp() - duration_ms expired = state.expire_windows(max_start_time=max_start_time) # Value from the cache takes precedence over the value in the db assert expired == [((0, 10), 3)] -def test_get_highest_id(windowed_rocksdb_store_factory): +def test_get_latest_timestamp(windowed_rocksdb_store_factory): store = windowed_rocksdb_store_factory() partition = store.assign_partition(0) timestamp = 123 @@ -141,7 +141,7 @@ def test_get_highest_id(windowed_rocksdb_store_factory): partition = store.assign_partition(0) with partition.begin() as tx: - assert tx.get_highest_id(prefix=prefix) == timestamp + assert tx.get_latest_timestamp(prefix=prefix) == timestamp @pytest.mark.parametrize( diff --git a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py index 6baf4bbfc..38aee1582 100644 --- a/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py +++ b/tests/test_quixstreams/test_state/test_rocksdb/test_windowed/test_transaction.py @@ -62,7 +62,7 @@ def test_expire_windows_expired(self, windowed_rocksdb_store_factory, delete): tx.update_window( start_ms=20, end_ms=30, value=3, timestamp_ms=20, prefix=prefix ) - max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms + max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms expired = tx.expire_windows( max_start_time=max_start_time, prefix=prefix, delete=delete ) @@ -112,7 +112,7 @@ def test_expire_windows_cached(self, windowed_rocksdb_store_factory, delete): tx.update_window( start_ms=20, end_ms=30, value=3, timestamp_ms=20, prefix=prefix ) - max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms + max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms expired = tx.expire_windows( max_start_time=max_start_time, prefix=prefix, delete=delete ) @@ -156,7 +156,7 @@ def test_expire_windows_empty(self, windowed_rocksdb_store_factory): tx.update_window( start_ms=3, end_ms=13, value=1, timestamp_ms=3, prefix=prefix ) - max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms + max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms assert not tx.expire_windows(max_start_time=max_start_time, prefix=prefix) def test_expire_windows_with_grace_expired(self, windowed_rocksdb_store_factory): @@ -175,7 +175,9 @@ def test_expire_windows_with_grace_expired(self, windowed_rocksdb_store_factory) tx.update_window( start_ms=15, end_ms=25, value=1, timestamp_ms=15, prefix=prefix ) - max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms - grace_ms + max_start_time = ( + tx.get_latest_timestamp(prefix=prefix) - duration_ms - grace_ms + ) expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix) assert len(expired) == 1 @@ -197,7 +199,9 @@ def test_expire_windows_with_grace_empty(self, windowed_rocksdb_store_factory): tx.update_window( start_ms=13, end_ms=23, value=1, timestamp_ms=13, prefix=prefix ) - max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms - grace_ms + max_start_time = ( + tx.get_latest_timestamp(prefix=prefix) - duration_ms - grace_ms + ) expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix) assert not expired @@ -276,7 +280,7 @@ def test_expire_windows_no_expired(self, windowed_rocksdb_store_factory): ) # "expire_windows" must update the expiration index so that the same # windows are not expired twice - max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms + max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms assert not tx.expire_windows(max_start_time=max_start_time, prefix=prefix) def test_expire_windows_multiple_windows(self, windowed_rocksdb_store_factory): @@ -302,7 +306,7 @@ def test_expire_windows_multiple_windows(self, windowed_rocksdb_store_factory): ) # "expire_windows" must update the expiration index so that the same # windows are not expired twice - max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms + max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix) assert len(expired) == 3 @@ -319,7 +323,7 @@ def test_get_highest_id_update(self, windowed_rocksdb_store_factory): tx.update_window(0, 10, value=1, timestamp_ms=timestamp, prefix=prefix) with partition.begin() as tx: - assert tx.get_highest_id(prefix=prefix) == timestamp + assert tx.get_latest_timestamp(prefix=prefix) == timestamp def test_get_highest_id_cannot_go_backwards(self, windowed_rocksdb_store_factory): store = windowed_rocksdb_store_factory() @@ -329,10 +333,10 @@ def test_get_highest_id_cannot_go_backwards(self, windowed_rocksdb_store_factory with partition.begin() as tx: tx.update_window(0, 10, value=1, timestamp_ms=timestamp, prefix=prefix) tx.update_window(0, 10, value=1, timestamp_ms=timestamp - 1, prefix=prefix) - assert tx.get_highest_id(prefix=prefix) == timestamp + assert tx.get_latest_timestamp(prefix=prefix) == timestamp with partition.begin() as tx: - assert tx.get_highest_id(prefix=prefix) == timestamp + assert tx.get_latest_timestamp(prefix=prefix) == timestamp def test_update_window_and_prepare( self, windowed_rocksdb_partition_factory, changelog_producer_mock