Skip to content

Commit 0e2bafe

Browse files
authored
feat: Reduce streaming for pynumaflow_lite (#290)
Signed-off-by: Vigith Maurice <vigith@gmail.com>
1 parent 4a381b5 commit 0e2bafe

File tree

10 files changed

+649
-2
lines changed

10 files changed

+649
-2
lines changed

packages/pynumaflow-lite/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ path = "tests/bin/session_reduce.rs"
4848
name = "test_accumulator"
4949
path = "tests/bin/accumulator.rs"
5050

51+
[[bin]]
52+
name = "test_reducestream"
53+
path = "tests/bin/reducestream.rs"
54+
5155
[[bin]]
5256
name = "test_sink"
5357
path = "tests/bin/sink.rs"

packages/pynumaflow-lite/pynumaflow_lite/__init__.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
except Exception: # pragma: no cover
2929
session_reducer = None
3030

31+
try:
32+
reducestreamer = _import_module(__name__ + ".reducestreamer")
33+
except Exception: # pragma: no cover
34+
reducestreamer = None
35+
3136
try:
3237
accumulator = _import_module(__name__ + ".accumulator")
3338
except Exception: # pragma: no cover
@@ -43,12 +48,13 @@
4348
except Exception: # pragma: no cover
4449
sourcer = None
4550

46-
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, Accumulator, Sinker, and Sourcer classes under the extension submodules for convenient access
51+
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, ReduceStreamer, Accumulator, Sinker, and Sourcer classes under the extension submodules for convenient access
4752
from ._map_dtypes import Mapper
4853
from ._batchmapper_dtypes import BatchMapper
4954
from ._mapstream_dtypes import MapStreamer
5055
from ._reduce_dtypes import Reducer
5156
from ._session_reduce_dtypes import SessionReducer
57+
from ._reducestreamer_dtypes import ReduceStreamer
5258
from ._accumulator_dtypes import Accumulator
5359
from ._sink_dtypes import Sinker
5460
from ._source_dtypes import Sourcer
@@ -83,6 +89,12 @@
8389
except Exception:
8490
pass
8591

92+
if reducestreamer is not None:
93+
try:
94+
setattr(reducestreamer, "ReduceStreamer", ReduceStreamer)
95+
except Exception:
96+
pass
97+
8698
if accumulator is not None:
8799
try:
88100
setattr(accumulator, "Accumulator", Accumulator)
@@ -102,7 +114,7 @@
102114
pass
103115

104116
# Public API
105-
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "accumulator", "sinker", "sourcer"]
117+
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator", "sinker", "sourcer"]
106118

107119
__doc__ = pynumaflow_lite.__doc__
108120
if hasattr(pynumaflow_lite, "__all__"):
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from abc import ABCMeta, abstractmethod
2+
from pynumaflow_lite.reducestreamer import Datum, Message, Metadata
3+
from collections.abc import AsyncIterable, AsyncIterator
4+
5+
6+
class ReduceStreamer(metaclass=ABCMeta):
7+
"""
8+
Interface for reduce streaming handlers. A new instance will be created per window.
9+
10+
Unlike regular Reducer which returns all messages at once, ReduceStreamer
11+
allows you to yield messages incrementally as an async iterator.
12+
"""
13+
14+
def __call__(self, *args, **kwargs):
15+
return self.handler(*args, **kwargs)
16+
17+
@abstractmethod
18+
async def handler(
19+
self,
20+
keys: list[str],
21+
datums: AsyncIterable[Datum],
22+
md: Metadata
23+
) -> AsyncIterator[Message]:
24+
"""
25+
Implement this handler; consume `datums` async iterable and yield Messages incrementally.
26+
27+
Args:
28+
keys: List of keys for this window
29+
datums: An async iterator of Datum objects
30+
md: Metadata containing window information
31+
32+
Yields:
33+
Message objects to be sent to the next vertex
34+
"""
35+
pass
36+
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from __future__ import annotations
2+
3+
import datetime as _dt
4+
from typing import Optional, List, Dict, Awaitable, AsyncIterator
5+
6+
# Re-export the Python ABC for user convenience and typing
7+
from ._reducestreamer_dtypes import ReduceStreamer as ReduceStreamer
8+
9+
10+
class Message:
11+
keys: Optional[List[str]]
12+
value: bytes
13+
tags: Optional[List[str]]
14+
15+
def __init__(
16+
self,
17+
value: bytes,
18+
keys: Optional[List[str]] = ...,
19+
tags: Optional[List[str]] = ...,
20+
) -> None: ...
21+
22+
@staticmethod
23+
def message_to_drop() -> Message: ...
24+
25+
26+
class Datum:
27+
keys: List[str]
28+
value: bytes
29+
watermark: _dt.datetime
30+
eventtime: _dt.datetime
31+
headers: Dict[str, str]
32+
33+
def __repr__(self) -> str: ...
34+
35+
def __str__(self) -> str: ...
36+
37+
38+
class IntervalWindow:
39+
start: _dt.datetime
40+
end: _dt.datetime
41+
42+
43+
class Metadata:
44+
interval_window: IntervalWindow
45+
46+
47+
class PyAsyncDatumStream:
48+
"""
49+
Python-visible async iterator that yields Datum items from a Tokio mpsc channel.
50+
"""
51+
52+
def __init__(self) -> None: ...
53+
def __aiter__(self) -> PyAsyncDatumStream: ...
54+
def __anext__(self) -> Datum: ...
55+
56+
57+
class ReduceStreamAsyncServer:
58+
def __init__(
59+
self,
60+
sock_file: str = ...,
61+
info_file: str = ...,
62+
) -> None: ...
63+
64+
def start(self, py_creator: type, init_args: tuple | None = ...) -> Awaitable[None]: ...
65+
66+
def stop(self) -> None: ...
67+
68+
69+
__all__ = [
70+
"Message",
71+
"Datum",
72+
"IntervalWindow",
73+
"Metadata",
74+
"PyAsyncDatumStream",
75+
"ReduceStreamAsyncServer",
76+
"ReduceStreamer",
77+
]
78+

packages/pynumaflow-lite/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod mapstream;
55
pub mod pyiterables;
66
pub mod pyrs;
77
pub mod reduce;
8+
pub mod reducestream;
89
pub mod session_reduce;
910
pub mod sink;
1011
pub mod source;
@@ -46,6 +47,13 @@ fn session_reducer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
4647
Ok(())
4748
}
4849

50+
/// Submodule: pynumaflow_lite.reducestreamer
51+
#[pymodule]
52+
fn reducestreamer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
53+
crate::reducestream::populate_py_module(m)?;
54+
Ok(())
55+
}
56+
4957
/// Submodule: pynumaflow_lite.accumulator
5058
#[pymodule]
5159
fn accumulator(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
@@ -76,6 +84,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
7684
m.add_wrapped(pyo3::wrap_pymodule!(mapstreamer))?;
7785
m.add_wrapped(pyo3::wrap_pymodule!(reducer))?;
7886
m.add_wrapped(pyo3::wrap_pymodule!(session_reducer))?;
87+
m.add_wrapped(pyo3::wrap_pymodule!(reducestreamer))?;
7988
m.add_wrapped(pyo3::wrap_pymodule!(accumulator))?;
8089
m.add_wrapped(pyo3::wrap_pymodule!(sinker))?;
8190
m.add_wrapped(pyo3::wrap_pymodule!(sourcer))?;
@@ -125,6 +134,15 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
125134
.getattr("modules")?
126135
.set_item(fullname, &sub)?;
127136

137+
// Ensure it's importable as `pynumaflow_lite.reducestreamer` as well
138+
let binding = m.getattr("reducestreamer")?;
139+
let sub = binding.cast::<PyModule>()?;
140+
let fullname = "pynumaflow_lite.reducestreamer";
141+
sub.setattr("__name__", fullname)?;
142+
py.import("sys")?
143+
.getattr("modules")?
144+
.set_item(fullname, &sub)?;
145+
128146
// Ensure it's importable as `pynumaflow_lite.accumulator` as well
129147
let binding = m.getattr("accumulator")?;
130148
let sub = binding.cast::<PyModule>()?;
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::sync::Mutex;
2+
3+
pub mod server;
4+
5+
use pyo3::prelude::*;
6+
7+
// Re-export types from reduce module that are shared
8+
pub use crate::reduce::{Datum, IntervalWindow, Message, Metadata, PyAsyncDatumStream};
9+
10+
/// Async Reduce Stream Server that can be started from Python code, taking a class (creator).
11+
#[pyclass(module = "pynumaflow_lite.reducestreamer")]
12+
pub struct ReduceStreamAsyncServer {
13+
sock_file: String,
14+
info_file: String,
15+
shutdown_tx: Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
16+
}
17+
18+
#[pymethods]
19+
impl ReduceStreamAsyncServer {
20+
#[new]
21+
#[pyo3(signature = (sock_file="/var/run/numaflow/reducestream.sock".to_string(), info_file="/var/run/numaflow/reducestreamer-server-info".to_string()))]
22+
fn new(sock_file: String, info_file: String) -> Self {
23+
Self {
24+
sock_file,
25+
info_file,
26+
shutdown_tx: Mutex::new(None),
27+
}
28+
}
29+
30+
/// Start the server with the given Python class (creator).
31+
/// Only class-based reducers are supported (not function-based).
32+
/// init_args is an optional tuple of positional arguments to pass to the class constructor.
33+
#[pyo3(signature = (py_creator, init_args=None))]
34+
pub fn start<'a>(
35+
&self,
36+
py: Python<'a>,
37+
py_creator: Py<PyAny>,
38+
init_args: Option<Py<PyAny>>,
39+
) -> PyResult<Bound<'a, PyAny>> {
40+
let sock_file = self.sock_file.clone();
41+
let info_file = self.info_file.clone();
42+
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
43+
{
44+
let mut guard = self.shutdown_tx.lock().unwrap();
45+
*guard = Some(tx);
46+
}
47+
48+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
49+
crate::reducestream::server::start(py_creator, init_args, sock_file, info_file, rx)
50+
.await
51+
.expect("server failed to start");
52+
Ok(())
53+
})
54+
}
55+
56+
/// Trigger server shutdown from Python (idempotent).
57+
pub fn stop(&self) -> PyResult<()> {
58+
if let Some(tx) = self.shutdown_tx.lock().unwrap().take() {
59+
let _ = tx.send(());
60+
}
61+
Ok(())
62+
}
63+
}
64+
65+
/// Helper to populate a PyModule with reduce stream types/functions.
66+
pub(crate) fn populate_py_module(m: &Bound<PyModule>) -> PyResult<()> {
67+
m.add_class::<Message>()?;
68+
m.add_class::<Datum>()?;
69+
m.add_class::<IntervalWindow>()?;
70+
m.add_class::<Metadata>()?;
71+
m.add_class::<ReduceStreamAsyncServer>()?;
72+
m.add_class::<PyAsyncDatumStream>()?;
73+
Ok(())
74+
}

0 commit comments

Comments
 (0)