Skip to content

Commit 24ce158

Browse files
authored
feat: source-transformer for pynumaflow-lite (#293)
Signed-off-by: Vigith Maurice <vigith@gmail.com>
1 parent 62f698b commit 24ce158

File tree

10 files changed

+723
-2
lines changed

10 files changed

+723
-2
lines changed

packages/pynumaflow-lite/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,7 @@ path = "tests/bin/sink.rs"
5959
[[bin]]
6060
name = "test_source"
6161
path = "tests/bin/source.rs"
62+
63+
[[bin]]
64+
name = "test_sourcetransform"
65+
path = "tests/bin/sourcetransform.rs"

packages/pynumaflow-lite/pynumaflow_lite/__init__.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@
4848
except Exception: # pragma: no cover
4949
sourcer = None
5050

51-
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, ReduceStreamer, Accumulator, Sinker, and Sourcer classes under the extension submodules for convenient access
51+
try:
52+
sourcetransformer = _import_module(__name__ + ".sourcetransformer")
53+
except Exception: # pragma: no cover
54+
sourcetransformer = None
55+
56+
# Surface the Python Mapper, BatchMapper, MapStreamer, Reducer, SessionReducer, ReduceStreamer, Accumulator, Sinker,
57+
# Sourcer, and SourceTransformer classes under the extension submodules for convenient access
5258
from ._map_dtypes import Mapper
5359
from ._batchmapper_dtypes import BatchMapper
5460
from ._mapstream_dtypes import MapStreamer
@@ -58,6 +64,7 @@
5864
from ._accumulator_dtypes import Accumulator
5965
from ._sink_dtypes import Sinker
6066
from ._source_dtypes import Sourcer
67+
from ._sourcetransformer_dtypes import SourceTransformer
6168

6269
if mapper is not None:
6370
try:
@@ -113,8 +120,15 @@
113120
except Exception:
114121
pass
115122

123+
if sourcetransformer is not None:
124+
try:
125+
setattr(sourcetransformer, "SourceTransformer", SourceTransformer)
126+
except Exception:
127+
pass
128+
116129
# Public API
117-
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator", "sinker", "sourcer"]
130+
__all__ = ["mapper", "batchmapper", "mapstreamer", "reducer", "session_reducer", "reducestreamer", "accumulator",
131+
"sinker", "sourcer", "sourcetransformer"]
118132

119133
__doc__ = pynumaflow_lite.__doc__
120134
if hasattr(pynumaflow_lite, "__all__"):
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from abc import ABCMeta, abstractmethod
2+
from pynumaflow_lite.sourcetransformer import Datum, Messages
3+
4+
5+
class SourceTransformer(metaclass=ABCMeta):
6+
"""
7+
Provides an interface to write a SourceTransformer
8+
which will be exposed over a gRPC server.
9+
10+
A SourceTransformer is used for transforming and assigning event time
11+
to input messages from a source.
12+
"""
13+
14+
def __call__(self, *args, **kwargs):
15+
"""
16+
This allows to execute the handler function directly if
17+
class instance is sent as a callable.
18+
"""
19+
return self.handler(*args, **kwargs)
20+
21+
@abstractmethod
22+
async def handler(self, keys: list[str], datum: Datum) -> Messages:
23+
"""
24+
Implement this handler function which implements the SourceTransformer interface.
25+
26+
Args:
27+
keys: The keys associated with the message.
28+
datum: The input datum containing value, event_time, watermark, and headers.
29+
30+
Returns:
31+
Messages: A collection of transformed messages with potentially modified
32+
event times and tags for conditional forwarding.
33+
"""
34+
pass
35+
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from __future__ import annotations
2+
3+
from typing import Optional, List, Dict, Callable, Awaitable, Any
4+
import datetime as _dt
5+
6+
# Re-export the Python ABC for user convenience and typing
7+
from ._sourcetransformer_dtypes import SourceTransformer as SourceTransformer
8+
9+
10+
class Messages:
11+
def __init__(self) -> None: ...
12+
13+
def append(self, message: Message) -> None: ...
14+
15+
def __repr__(self) -> str: ...
16+
17+
def __str__(self) -> str: ...
18+
19+
20+
class Message:
21+
keys: Optional[List[str]]
22+
value: bytes
23+
event_time: _dt.datetime
24+
tags: Optional[List[str]]
25+
26+
def __init__(
27+
self,
28+
value: bytes,
29+
event_time: _dt.datetime,
30+
keys: Optional[List[str]] = ...,
31+
tags: Optional[List[str]] = ...,
32+
) -> None: ...
33+
34+
@staticmethod
35+
def message_to_drop(event_time: _dt.datetime) -> Message: ...
36+
37+
38+
class Datum:
39+
# Read-only attributes provided by the extension
40+
keys: List[str]
41+
value: bytes
42+
watermark: _dt.datetime
43+
event_time: _dt.datetime
44+
headers: Dict[str, str]
45+
46+
def __repr__(self) -> str: ...
47+
48+
def __str__(self) -> str: ...
49+
50+
51+
class SourceTransformAsyncServer:
52+
def __init__(
53+
self,
54+
sock_file: str | None = ...,
55+
info_file: str | None = ...,
56+
) -> None: ...
57+
58+
def start(self, py_func: Callable[..., Any]) -> Awaitable[None]: ...
59+
60+
def stop(self) -> None: ...
61+
62+
63+
__all__ = [
64+
"Messages",
65+
"Message",
66+
"Datum",
67+
"SourceTransformAsyncServer",
68+
"SourceTransformer",
69+
]
70+

packages/pynumaflow-lite/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod reducestream;
99
pub mod session_reduce;
1010
pub mod sink;
1111
pub mod source;
12+
pub mod sourcetransform;
1213

1314
use pyo3::prelude::*;
1415

@@ -75,6 +76,13 @@ fn sourcer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
7576
Ok(())
7677
}
7778

79+
/// Submodule: pynumaflow_lite.sourcetransformer
80+
#[pymodule]
81+
fn sourcetransformer(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
82+
crate::sourcetransform::populate_py_module(m)?;
83+
Ok(())
84+
}
85+
7886
/// Top-level Python module `pynumaflow_lite` with submodules like `mapper`, `batchmapper`, and `mapstreamer`.
7987
#[pymodule]
8088
fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
@@ -88,6 +96,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
8896
m.add_wrapped(pyo3::wrap_pymodule!(accumulator))?;
8997
m.add_wrapped(pyo3::wrap_pymodule!(sinker))?;
9098
m.add_wrapped(pyo3::wrap_pymodule!(sourcer))?;
99+
m.add_wrapped(pyo3::wrap_pymodule!(sourcetransformer))?;
91100

92101
// Ensure it's importable as `pynumaflow_lite.mapper` as well as attribute access
93102
let binding = m.getattr("mapper")?;
@@ -170,5 +179,14 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
170179
.getattr("modules")?
171180
.set_item(fullname, &sub)?;
172181

182+
// Ensure it's importable as `pynumaflow_lite.sourcetransformer` as well
183+
let binding = m.getattr("sourcetransformer")?;
184+
let sub = binding.cast::<PyModule>()?;
185+
let fullname = "pynumaflow_lite.sourcetransformer";
186+
sub.setattr("__name__", fullname)?;
187+
py.import("sys")?
188+
.getattr("modules")?
189+
.set_item(fullname, &sub)?;
190+
173191
Ok(())
174192
}

0 commit comments

Comments
 (0)