diff --git a/packages/pynumaflow-lite/manifests/reducestream/Dockerfile b/packages/pynumaflow-lite/manifests/reducestream/Dockerfile new file mode 100644 index 00000000..14d1b02c --- /dev/null +++ b/packages/pynumaflow-lite/manifests/reducestream/Dockerfile @@ -0,0 +1,39 @@ +FROM python:3.11-slim-bullseye AS builder + +ENV PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PIP_NO_CACHE_DIR=on \ + PIP_DISABLE_PIP_VERSION_CHECK=on \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_HOME="/opt/poetry" \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + POETRY_NO_INTERACTION=1 \ + PYSETUP_PATH="/opt/pysetup" + +ENV PATH="$POETRY_HOME/bin:$PATH" + +RUN apt-get update \ + && apt-get install --no-install-recommends -y \ + curl \ + wget \ + # deps for building python deps + build-essential \ + && apt-get install -y git \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && curl -sSL https://install.python-poetry.org | python3 - + +FROM builder AS udf + +WORKDIR $PYSETUP_PATH +COPY ./ ./ + +# NOTE: place the built wheel in this directory before building the image +RUN pip install $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl + +RUN poetry lock +RUN poetry install --no-cache --no-root && \ + rm -rf ~/.cache/pypoetry/ + +CMD ["python", "reducestream_counter.py"] + diff --git a/packages/pynumaflow-lite/manifests/reducestream/README.md b/packages/pynumaflow-lite/manifests/reducestream/README.md new file mode 100644 index 00000000..dc62fd6f --- /dev/null +++ b/packages/pynumaflow-lite/manifests/reducestream/README.md @@ -0,0 +1,38 @@ +To create the `wheel` file, refer [root](../../README.md) + +## HOWTO build Image + +```bash +docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1 --load +``` + +Load it now to `k3d` + +### `k3d` + +```bash +k3d image import quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1 +``` + +### Minikube + +```bash +minikube image load quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1 +``` + +#### Delete image from minikube + +`minikube` doesn't like pushing the same image over, delete and load if you are using +the same tag. + +```bash +minikube image rm quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1 +``` + +## Run the pipeline + +```bash +kubectl apply -f pipeline.yaml +``` + + diff --git a/packages/pynumaflow-lite/manifests/reducestream/pipeline.yaml b/packages/pynumaflow-lite/manifests/reducestream/pipeline.yaml new file mode 100644 index 00000000..c2c3797f --- /dev/null +++ b/packages/pynumaflow-lite/manifests/reducestream/pipeline.yaml @@ -0,0 +1,37 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: reducestream-counter +spec: + vertices: + - name: in + source: + # A self data generating source + generator: + rpu: 10 + duration: 1s + - name: reducestream + partitions: 1 + udf: + container: + image: quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1 + imagePullPolicy: Never + groupBy: + window: + fixed: + length: 10s + streaming: true + keyed: true + storage: + emptyDir: { } + - name: sink + scale: + min: 1 + sink: + log: { } + edges: + - from: in + to: reducestream + - from: reducestream + to: sink + diff --git a/packages/pynumaflow-lite/manifests/reducestream/pyproject.toml b/packages/pynumaflow-lite/manifests/reducestream/pyproject.toml new file mode 100644 index 00000000..c5eb1a17 --- /dev/null +++ b/packages/pynumaflow-lite/manifests/reducestream/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "reducestream-counter" +version = "0.1.0" +description = "Reduce streaming counter example using pynumaflow-lite" +authors = [ + { name = "Vigith Maurice", email = "vigith@gmail.com" } +] +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ +] + +[build-system] +requires = ["poetry-core>=2.0.0,<3.0.0"] +build-backend = "poetry.core.masonry.api" + diff --git a/packages/pynumaflow-lite/manifests/reducestream/reducestream_counter.py b/packages/pynumaflow-lite/manifests/reducestream/reducestream_counter.py new file mode 100644 index 00000000..3eff4ef4 --- /dev/null +++ b/packages/pynumaflow-lite/manifests/reducestream/reducestream_counter.py @@ -0,0 +1,108 @@ +""" +Reduce Streaming Counter Example + +This example demonstrates how to use ReduceStreamer to emit intermediate results +as data arrives, rather than waiting until all data is received. + +The counter increments for each datum and emits a message every 10 items, +plus a final message at the end. +""" +import asyncio +import signal +from collections.abc import AsyncIterable, AsyncIterator + +from pynumaflow_lite import reducestreamer + + +class ReduceCounter(reducestreamer.ReduceStreamer): + """ + A reduce streaming counter that emits intermediate results. + + This demonstrates the key difference from regular Reducer: + - Regular Reducer: waits for all data, then returns Messages + - ReduceStreamer: yields Message objects incrementally as an async iterator + """ + + def __init__(self, initial: int = 0) -> None: + self.counter = initial + + async def handler( + self, + keys: list[str], + datums: AsyncIterable[reducestreamer.Datum], + md: reducestreamer.Metadata, + ) -> AsyncIterator[reducestreamer.Message]: + """ + Process datums and yield messages incrementally. + + Args: + keys: List of keys for this window + datums: Async iterable of incoming data + md: Metadata containing window information + + Yields: + Message objects to send to the next vertex + """ + iw = md.interval_window + print(f"Handler started for keys={keys}, window=[{iw.start}, {iw.end}]") + + async for _ in datums: + self.counter += 1 + + # Emit intermediate result every 10 items + if self.counter % 10 == 0: + msg = ( + f"counter:{self.counter} " + f"interval_window_start:{iw.start} " + f"interval_window_end:{iw.end}" + ).encode() + print(f"Yielding intermediate result: counter={self.counter}") + # Early release of data - this is the key feature of reduce streaming! + yield reducestreamer.Message(msg, keys=keys) + + # Emit final result + msg = ( + f"counter:{self.counter} (FINAL) " + f"interval_window_start:{iw.start} " + f"interval_window_end:{iw.end}" + ).encode() + print(f"Yielding final result: counter={self.counter}") + yield reducestreamer.Message(msg, keys=keys) + + +# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly. +signal.signal(signal.SIGINT, signal.default_int_handler) +try: + signal.signal(signal.SIGTERM, signal.SIG_DFL) +except AttributeError: + pass + + +async def start(creator: type, init_args: tuple): + """Start the reduce stream server.""" + sock_file = "/var/run/numaflow/reducestream.sock" + server_info_file = "/var/run/numaflow/reducestreamer-server-info" + server = reducestreamer.ReduceStreamAsyncServer(sock_file, server_info_file) + + loop = asyncio.get_running_loop() + try: + loop.add_signal_handler(signal.SIGINT, lambda: server.stop()) + loop.add_signal_handler(signal.SIGTERM, lambda: server.stop()) + except (NotImplementedError, RuntimeError): + pass + + try: + print("Starting Reduce Stream Counter Server...") + await server.start(creator, init_args) + print("Shutting down gracefully...") + except asyncio.CancelledError: + try: + server.stop() + except Exception: + pass + return + + +if __name__ == "__main__": + asyncio.run(start(ReduceCounter, (0,))) +