Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions packages/pynumaflow-lite/manifests/reducestream/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM python:3.11-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup"

ENV PATH="$POETRY_HOME/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -sSL https://install.python-poetry.org | python3 -

FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./ ./

# NOTE: place the built wheel in this directory before building the image
RUN pip install $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl

RUN poetry lock
RUN poetry install --no-cache --no-root && \
rm -rf ~/.cache/pypoetry/

CMD ["python", "reducestream_counter.py"]

38 changes: 38 additions & 0 deletions packages/pynumaflow-lite/manifests/reducestream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
To create the `wheel` file, refer [root](../../README.md)

## HOWTO build Image

```bash
docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1 --load
```

Load it now to `k3d`

### `k3d`

```bash
k3d image import quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1
```

### Minikube

```bash
minikube image load quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1
```

#### Delete image from minikube

`minikube` doesn't like pushing the same image over, delete and load if you are using
the same tag.

```bash
minikube image rm quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1
```

## Run the pipeline

```bash
kubectl apply -f pipeline.yaml
```


37 changes: 37 additions & 0 deletions packages/pynumaflow-lite/manifests/reducestream/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: reducestream-counter
spec:
vertices:
- name: in
source:
# A self data generating source
generator:
rpu: 10
duration: 1s
- name: reducestream
partitions: 1
udf:
container:
image: quay.io/numaio/numaflow/pynumaflow-lite-reducestream-counter:v1
imagePullPolicy: Never
groupBy:
window:
fixed:
length: 10s
streaming: true
keyed: true
storage:
emptyDir: { }
- name: sink
scale:
min: 1
sink:
log: { }
edges:
- from: in
to: reducestream
- from: reducestream
to: sink

16 changes: 16 additions & 0 deletions packages/pynumaflow-lite/manifests/reducestream/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[project]
name = "reducestream-counter"
version = "0.1.0"
description = "Reduce streaming counter example using pynumaflow-lite"
authors = [
{ name = "Vigith Maurice", email = "vigith@gmail.com" }
]
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
]

[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Reduce Streaming Counter Example

This example demonstrates how to use ReduceStreamer to emit intermediate results
as data arrives, rather than waiting until all data is received.

The counter increments for each datum and emits a message every 10 items,
plus a final message at the end.
"""
import asyncio
import signal
from collections.abc import AsyncIterable, AsyncIterator

from pynumaflow_lite import reducestreamer


class ReduceCounter(reducestreamer.ReduceStreamer):
"""
A reduce streaming counter that emits intermediate results.

This demonstrates the key difference from regular Reducer:
- Regular Reducer: waits for all data, then returns Messages
- ReduceStreamer: yields Message objects incrementally as an async iterator
"""

def __init__(self, initial: int = 0) -> None:
self.counter = initial

async def handler(
self,
keys: list[str],
datums: AsyncIterable[reducestreamer.Datum],
md: reducestreamer.Metadata,
) -> AsyncIterator[reducestreamer.Message]:
"""
Process datums and yield messages incrementally.

Args:
keys: List of keys for this window
datums: Async iterable of incoming data
md: Metadata containing window information

Yields:
Message objects to send to the next vertex
"""
iw = md.interval_window
print(f"Handler started for keys={keys}, window=[{iw.start}, {iw.end}]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: log.info instead of print ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was using native entities so far in examples as much as possible


async for _ in datums:
self.counter += 1

# Emit intermediate result every 10 items
if self.counter % 10 == 0:
msg = (
f"counter:{self.counter} "
f"interval_window_start:{iw.start} "
f"interval_window_end:{iw.end}"
).encode()
print(f"Yielding intermediate result: counter={self.counter}")
# Early release of data - this is the key feature of reduce streaming!
yield reducestreamer.Message(msg, keys=keys)

# Emit final result
msg = (
f"counter:{self.counter} (FINAL) "
f"interval_window_start:{iw.start} "
f"interval_window_end:{iw.end}"
).encode()
print(f"Yielding final result: counter={self.counter}")
yield reducestreamer.Message(msg, keys=keys)


# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass


async def start(creator: type, init_args: tuple):
"""Start the reduce stream server."""
sock_file = "/var/run/numaflow/reducestream.sock"
server_info_file = "/var/run/numaflow/reducestreamer-server-info"
server = reducestreamer.ReduceStreamAsyncServer(sock_file, server_info_file)

loop = asyncio.get_running_loop()
try:
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
except (NotImplementedError, RuntimeError):
pass

try:
print("Starting Reduce Stream Counter Server...")
await server.start(creator, init_args)
print("Shutting down gracefully...")
except asyncio.CancelledError:
try:
server.stop()
except Exception:
pass
return


if __name__ == "__main__":
asyncio.run(start(ReduceCounter, (0,)))