Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protos
66 changes: 34 additions & 32 deletions src/streamstore/_lib/s2/v1alpha/s2_pb2.py

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion src/streamstore/_lib/s2/v1alpha/s2_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ class ReadSessionResponse(_message.Message):
) -> None: ...

class StreamConfig(_message.Message):
__slots__ = ("storage_class", "age", "timestamping")
__slots__ = ("storage_class", "age", "timestamping", "delete_on_empty")
class Timestamping(_message.Message):
__slots__ = ("mode", "uncapped")
MODE_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -650,17 +650,26 @@ class StreamConfig(_message.Message):
uncapped: bool = ...,
) -> None: ...

class DeleteOnEmpty(_message.Message):
__slots__ = ("min_age_secs",)
MIN_AGE_SECS_FIELD_NUMBER: _ClassVar[int]
min_age_secs: int
def __init__(self, min_age_secs: _Optional[int] = ...) -> None: ...

STORAGE_CLASS_FIELD_NUMBER: _ClassVar[int]
AGE_FIELD_NUMBER: _ClassVar[int]
TIMESTAMPING_FIELD_NUMBER: _ClassVar[int]
DELETE_ON_EMPTY_FIELD_NUMBER: _ClassVar[int]
storage_class: StorageClass
age: int
timestamping: StreamConfig.Timestamping
delete_on_empty: StreamConfig.DeleteOnEmpty
def __init__(
self,
storage_class: _Optional[_Union[StorageClass, str]] = ...,
age: _Optional[int] = ...,
timestamping: _Optional[_Union[StreamConfig.Timestamping, _Mapping]] = ...,
delete_on_empty: _Optional[_Union[StreamConfig.DeleteOnEmpty, _Mapping]] = ...,
) -> None: ...

class BasinConfig(_message.Message):
Expand Down
11 changes: 11 additions & 0 deletions src/streamstore/_mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
StreamInfo,
TailOffset,
Timestamp,
Timestamping,
TimestampingMode,
)

_ReadStart = SeqNum | Timestamp | TailOffset
Expand Down Expand Up @@ -125,6 +127,7 @@ def stream_config_message(
storage_class = config.storage_class
retention_age = config.retention_age
timestamping = config.timestamping
delete_on_empty_min_age = config.delete_on_empty_min_age
if storage_class is not None:
paths.append(f"{mask_path_prefix}storage_class")
stream_config.storage_class = msgs.StorageClass(storage_class.value)
Expand All @@ -142,6 +145,9 @@ def stream_config_message(
if timestamping.uncapped is not None:
paths.append(f"{mask_path_prefix}timestamping.uncapped")
stream_config.timestamping.uncapped = timestamping.uncapped
if delete_on_empty_min_age is not None:
paths.append(f"{mask_path_prefix}delete_on_empty.min_age_secs")
stream_config.delete_on_empty.min_age_secs = delete_on_empty_min_age
if return_mask_paths:
return (stream_config, paths)
return stream_config
Expand Down Expand Up @@ -181,6 +187,11 @@ def stream_config_schema(config: msgs.StreamConfig) -> StreamConfig:
return StreamConfig(
StorageClass(config.storage_class),
config.age,
Timestamping(
mode=TimestampingMode(config.timestamping.mode),
uncapped=config.timestamping.uncapped,
),
config.delete_on_empty.min_age_secs,
)


Expand Down
4 changes: 4 additions & 0 deletions src/streamstore/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ class StreamConfig:
retention_age: int | None = None
#: Timestamping behavior for appends to this stream, which influences how timestamps are handled.
timestamping: Timestamping | None = None
#: Minimum age in seconds before this stream can be automatically deleted if empty.
#:
#: If not specified or set to ``0``, this stream will not be automatically deleted.
delete_on_empty_min_age: int | None = None


@dataclass(slots=True)
Expand Down
Loading