Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
quentin-quix committed Feb 4, 2025
1 parent 7d43b6a commit ef1a991
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 46 deletions.
2 changes: 1 addition & 1 deletion quixstreams/dataframe/windows/sliding.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def process_window(
# Sliding windows are inclusive on both ends, so values with
# timestamps equal to latest_timestamp - duration - grace
# are still eligible for processing.
state_ts = state.get_highest_id() or 0
state_ts = state.get_latest_timestamp() or 0
latest_timestamp = max(timestamp_ms, state_ts)
max_expired_window_end = latest_timestamp - grace - 1
max_expired_window_start = max_expired_window_end - duration
Expand Down
2 changes: 1 addition & 1 deletion quixstreams/dataframe/windows/time_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def process_window(
step_ms=self._step_ms,
)

state_ts = state.get_highest_id() or 0
state_ts = state.get_latest_timestamp() or 0
latest_timestamp = max(timestamp_ms, state_ts)
max_expired_window_end = latest_timestamp - grace_ms
max_expired_window_start = max_expired_window_end - duration_ms
Expand Down
10 changes: 5 additions & 5 deletions quixstreams/state/rocksdb/windowed/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ def delete_from_collection(self, end: int) -> None:
"""
return self._transaction.delete_from_collection(end=end, prefix=self._prefix)

def get_highest_id(self) -> Optional[int]:
def get_latest_timestamp(self) -> Optional[int]:
"""
Get the highest observed message ID for the current message key.
Get the latest observed timestamp for the current message key.
Use this ID to determine if the arriving event is late and should be
Use this timestamp to determine if the arriving event is late and should be
discarded from the processing.
:return: latest observed event ID
:return: latest observed timestamp
"""
return self._transaction.get_highest_id(prefix=self._prefix)
return self._transaction.get_latest_timestamp(prefix=self._prefix)

def expire_windows(
self,
Expand Down
38 changes: 20 additions & 18 deletions quixstreams/state/rocksdb/windowed/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def as_state(self, prefix: Any = DEFAULT_PREFIX) -> WindowedTransactionState: #
),
)

def get_highest_id(self, prefix: bytes) -> Optional[int]:
return self._get_highest_id(
def get_latest_timestamp(self, prefix: bytes) -> Optional[int]:
return self._get_latest_timestamp(
prefix=prefix, cache=self._latest_timestamps, default=0
)

Expand Down Expand Up @@ -128,17 +128,17 @@ def update_window(

key = encode_integer_pair(start_ms, end_ms)
self.set(key=key, value=value, prefix=prefix)
latest_timestamp_ms = self.get_highest_id(prefix=prefix)
latest_timestamp_ms = self.get_latest_timestamp(prefix=prefix)
updated_timestamp_ms = (
max(latest_timestamp_ms, timestamp_ms)
if latest_timestamp_ms is not None
else timestamp_ms
)

self._set_highest_id(
self._set_latest_timestamp(
cache=self._latest_timestamps,
prefix=prefix,
id=updated_timestamp_ms,
timestamp_ms=updated_timestamp_ms,
)

def add_to_collection(
Expand All @@ -164,7 +164,7 @@ def get_from_collection(self, start: int, end: int, prefix: bytes) -> list[Any]:

def delete_from_collection(self, end: int, prefix: bytes) -> None:
start = (
self._get_highest_id(
self._get_latest_timestamp(
cache=self._last_deleted_value_timestamps, prefix=prefix
)
or -1
Expand All @@ -180,10 +180,10 @@ def delete_from_collection(self, end: int, prefix: bytes) -> None:
self.delete(key=key, prefix=prefix, cf_name=VALUES_CF_NAME)

if last_deleted_id is not None:
self._set_highest_id(
self._set_latest_timestamp(
cache=self._last_deleted_value_timestamps,
prefix=prefix,
id=last_deleted_id,
timestamp_ms=last_deleted_id,
)

def delete_window(self, start_ms: int, end_ms: int, prefix: bytes):
Expand Down Expand Up @@ -233,7 +233,7 @@ def expire_windows(
start_from = -1

# Find the latest start timestamp of the expired windows for the given key
last_expired = self._get_highest_id(
last_expired = self._get_latest_timestamp(
cache=self._last_expired_timestamps, prefix=prefix
)
if last_expired is not None:
Expand All @@ -253,10 +253,10 @@ def expire_windows(
latest_window = expired_windows[-1]
last_expired__gt = latest_window[0][0]

self._set_highest_id(
self._set_latest_timestamp(
cache=self._last_expired_timestamps,
prefix=prefix,
id=last_expired__gt,
timestamp_ms=last_expired__gt,
)

# Collect values into windows
Expand Down Expand Up @@ -318,7 +318,7 @@ def delete_windows(
start_from = -1

# Find the latest start timestamp of the deleted windows for the given key
last_deleted = self._get_highest_id(
last_deleted = self._get_latest_timestamp(
cache=self._last_deleted_window_timestamps, prefix=prefix
)
if last_deleted is not None:
Expand All @@ -337,10 +337,10 @@ def delete_windows(

# Save the start of the latest deleted window to the deletion index
if last_deleted__gt:
self._set_highest_id(
self._set_latest_timestamp(
cache=self._last_deleted_window_timestamps,
prefix=prefix,
id=last_deleted__gt,
timestamp_ms=last_deleted__gt,
)

if delete_values:
Expand Down Expand Up @@ -445,7 +445,7 @@ def _get_items(
# Sort and deserialize items merged from the cache and store
return sorted(merged_items.items(), key=lambda kv: kv[0], reverse=backwards)

def _get_highest_id(
def _get_latest_timestamp(
self, cache: TimestampsCache, prefix: bytes, default: Any = None
) -> Optional[int]:
cached_ts = cache.timestamps.get(prefix)
Expand All @@ -464,11 +464,13 @@ def _get_highest_id(
cache.timestamps[prefix] = stored_ts
return stored_ts

def _set_highest_id(self, cache: TimestampsCache, prefix: bytes, id: int):
cache.timestamps[prefix] = id
def _set_latest_timestamp(
self, cache: TimestampsCache, prefix: bytes, timestamp_ms: int
):
cache.timestamps[prefix] = timestamp_ms
self.set(
key=cache.key,
value=id,
value=timestamp_ms,
prefix=prefix,
cf_name=cache.cf_name,
)
Expand Down
11 changes: 5 additions & 6 deletions quixstreams/state/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def delete_from_collection(self, end: int) -> None:
"""
...

def get_highest_id(self) -> Optional[int]:
def get_latest_timestamp(self) -> Optional[int]:
"""
Get the latest observed message ID for the current state prefix
(same as message key).
Expand Down Expand Up @@ -300,15 +300,14 @@ def delete_from_collection(self, end: int) -> None:
"""
...

def get_highest_id(self, prefix: bytes) -> int:
def get_latest_timestamp(self, prefix: bytes) -> int:
"""
Get the latest observed message ID for the current state prefix
(same as message key).
Get the latest observed timestamp for the current message key.
Use this ID to determine if the arriving event is late and should be
Use this timestamp to determine if the arriving event is late and should be
discarded from the processing.
:return: latest observed event ID
:return: latest observed timestamp
"""
...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_expire_windows(transaction_state, delete):

with transaction_state() as state:
state.update_window(start_ms=20, end_ms=30, value=3, timestamp_ms=20)
max_start_time = state.get_highest_id() - duration_ms
max_start_time = state.get_latest_timestamp() - duration_ms
expired = state.expire_windows(max_start_time=max_start_time, delete=delete)
# "expire_windows" must update the expiration index so that the same
# windows are not expired twice
Expand Down Expand Up @@ -96,7 +96,7 @@ def test_expire_windows_with_collect(transaction_state, end_inclusive):

with transaction_state() as state:
state.update_window(start_ms=20, end_ms=30, value=None, timestamp_ms=20)
max_start_time = state.get_highest_id() - duration_ms
max_start_time = state.get_latest_timestamp() - duration_ms
expired = state.expire_windows(
max_start_time=max_start_time,
collect=True,
Expand All @@ -122,14 +122,14 @@ def test_same_keys_in_db_and_update_cache(transaction_state):
state.update_window(start_ms=0, end_ms=10, value=3, timestamp_ms=8)

state.update_window(start_ms=10, end_ms=20, value=2, timestamp_ms=10)
max_start_time = state.get_highest_id() - duration_ms
max_start_time = state.get_latest_timestamp() - duration_ms
expired = state.expire_windows(max_start_time=max_start_time)

# Value from the cache takes precedence over the value in the db
assert expired == [((0, 10), 3)]


def test_get_highest_id(windowed_rocksdb_store_factory):
def test_get_latest_timestamp(windowed_rocksdb_store_factory):
store = windowed_rocksdb_store_factory()
partition = store.assign_partition(0)
timestamp = 123
Expand All @@ -141,7 +141,7 @@ def test_get_highest_id(windowed_rocksdb_store_factory):

partition = store.assign_partition(0)
with partition.begin() as tx:
assert tx.get_highest_id(prefix=prefix) == timestamp
assert tx.get_latest_timestamp(prefix=prefix) == timestamp


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_expire_windows_expired(self, windowed_rocksdb_store_factory, delete):
tx.update_window(
start_ms=20, end_ms=30, value=3, timestamp_ms=20, prefix=prefix
)
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
expired = tx.expire_windows(
max_start_time=max_start_time, prefix=prefix, delete=delete
)
Expand Down Expand Up @@ -112,7 +112,7 @@ def test_expire_windows_cached(self, windowed_rocksdb_store_factory, delete):
tx.update_window(
start_ms=20, end_ms=30, value=3, timestamp_ms=20, prefix=prefix
)
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
expired = tx.expire_windows(
max_start_time=max_start_time, prefix=prefix, delete=delete
)
Expand Down Expand Up @@ -156,7 +156,7 @@ def test_expire_windows_empty(self, windowed_rocksdb_store_factory):
tx.update_window(
start_ms=3, end_ms=13, value=1, timestamp_ms=3, prefix=prefix
)
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
assert not tx.expire_windows(max_start_time=max_start_time, prefix=prefix)

def test_expire_windows_with_grace_expired(self, windowed_rocksdb_store_factory):
Expand All @@ -175,7 +175,9 @@ def test_expire_windows_with_grace_expired(self, windowed_rocksdb_store_factory)
tx.update_window(
start_ms=15, end_ms=25, value=1, timestamp_ms=15, prefix=prefix
)
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms - grace_ms
max_start_time = (
tx.get_latest_timestamp(prefix=prefix) - duration_ms - grace_ms
)
expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix)

assert len(expired) == 1
Expand All @@ -197,7 +199,9 @@ def test_expire_windows_with_grace_empty(self, windowed_rocksdb_store_factory):
tx.update_window(
start_ms=13, end_ms=23, value=1, timestamp_ms=13, prefix=prefix
)
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms - grace_ms
max_start_time = (
tx.get_latest_timestamp(prefix=prefix) - duration_ms - grace_ms
)
expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix)

assert not expired
Expand Down Expand Up @@ -276,7 +280,7 @@ def test_expire_windows_no_expired(self, windowed_rocksdb_store_factory):
)
# "expire_windows" must update the expiration index so that the same
# windows are not expired twice
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
assert not tx.expire_windows(max_start_time=max_start_time, prefix=prefix)

def test_expire_windows_multiple_windows(self, windowed_rocksdb_store_factory):
Expand All @@ -302,7 +306,7 @@ def test_expire_windows_multiple_windows(self, windowed_rocksdb_store_factory):
)
# "expire_windows" must update the expiration index so that the same
# windows are not expired twice
max_start_time = tx.get_highest_id(prefix=prefix) - duration_ms
max_start_time = tx.get_latest_timestamp(prefix=prefix) - duration_ms
expired = tx.expire_windows(max_start_time=max_start_time, prefix=prefix)

assert len(expired) == 3
Expand All @@ -319,7 +323,7 @@ def test_get_highest_id_update(self, windowed_rocksdb_store_factory):
tx.update_window(0, 10, value=1, timestamp_ms=timestamp, prefix=prefix)

with partition.begin() as tx:
assert tx.get_highest_id(prefix=prefix) == timestamp
assert tx.get_latest_timestamp(prefix=prefix) == timestamp

def test_get_highest_id_cannot_go_backwards(self, windowed_rocksdb_store_factory):
store = windowed_rocksdb_store_factory()
Expand All @@ -329,10 +333,10 @@ def test_get_highest_id_cannot_go_backwards(self, windowed_rocksdb_store_factory
with partition.begin() as tx:
tx.update_window(0, 10, value=1, timestamp_ms=timestamp, prefix=prefix)
tx.update_window(0, 10, value=1, timestamp_ms=timestamp - 1, prefix=prefix)
assert tx.get_highest_id(prefix=prefix) == timestamp
assert tx.get_latest_timestamp(prefix=prefix) == timestamp

with partition.begin() as tx:
assert tx.get_highest_id(prefix=prefix) == timestamp
assert tx.get_latest_timestamp(prefix=prefix) == timestamp

def test_update_window_and_prepare(
self, windowed_rocksdb_partition_factory, changelog_producer_mock
Expand Down

0 comments on commit ef1a991

Please sign in to comment.