Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ spec:
vertices:
- name: in
source:
# A self data generating source
http: { }
- name: session-counter
udf:
Expand Down
38 changes: 38 additions & 0 deletions packages/pynumaflow-lite/manifests/sourcetransform/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
FROM python:3.11-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup"

ENV PATH="$POETRY_HOME/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -sSL https://install.python-poetry.org | python3 -

FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./ ./

RUN pip install $PYSETUP_PATH/pynumaflow_lite-0.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl

RUN poetry lock
RUN poetry install --no-cache --no-root && \
rm -rf ~/.cache/pypoetry/

CMD ["python", "sourcetransform_event_filter.py"]

33 changes: 33 additions & 0 deletions packages/pynumaflow-lite/manifests/sourcetransform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
To create the `wheel` file, refer [root](../../README.md)

## HOWTO build Image

```bash
docker build . -t quay.io/numaio/numaflow/pynumaflow-lite-sourcetransform-event-filter:v1 --load
```

Load it now to `k3d`

```bash
k3d image import quay.io/numaio/numaflow/pynumaflow-lite-sourcetransform-event-filter:v1
```

## Run the pipeline

```bash
kubectl apply -f pipeline.yaml
```

## About this example

This source transformer filters and routes messages based on their event time:

- **Messages before 2022**: Dropped
- **Messages within 2022**: Tagged with `within_year_2022` and event time set to Jan 1, 2022
- **Messages after 2022**: Tagged with `after_year_2022` and event time set to Jan 1, 2023

This demonstrates how source transformers can be used to:
1. Filter out old/stale data
2. Normalize event times
3. Route messages to different downstream vertices based on conditions

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: sourcetransform-event-filter
spec:
vertices:
- name: in
source:
# HTTP Source to control the event time
http: { }
transformer:
container:
image: quay.io/numaio/numaflow/pynumaflow-lite-sourcetransform-event-filter:v1
imagePullPolicy: Never
- name: sink
scale:
min: 1
sink:
log: { }
edges:
- from: in
to: sink

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[project]
name = "sourcetransform-event-filter"
version = "0.1.0"
description = "Source Transformer Event Filter Example"
authors = [
{ name = "Vigith Maurice", email = "vigith@gmail.com" }
]
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
]


[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import asyncio
import signal
from datetime import datetime, timezone
from pynumaflow_lite import sourcetransformer

# Define epoch timestamps for filtering
january_first_2022 = datetime(2022, 1, 1, tzinfo=timezone.utc)
january_first_2023 = datetime(2023, 1, 1, tzinfo=timezone.utc)


class EventFilter(sourcetransformer.SourceTransformer):
"""
A source transformer that filters and routes messages based on event time.

- Messages before 2022 are dropped
- Messages within 2022 are tagged with "within_year_2022"
- Messages after 2022 are tagged with "after_year_2022"
"""

async def handler(
self, keys: list[str], datum: sourcetransformer.Datum
) -> sourcetransformer.Messages:
val = datum.value
event_time = datum.event_time
messages = sourcetransformer.Messages()

if event_time < january_first_2022:
print(f"Got event time: {event_time}, it is before 2022, so dropping")
messages.append(sourcetransformer.Message.message_to_drop(event_time))
elif event_time < january_first_2023:
print(f"Got event time: {event_time}, it is within year 2022, so forwarding to within_year_2022")
messages.append(
sourcetransformer.Message(
value=val,
event_time=january_first_2022,
keys=keys,
tags=["within_year_2022"]
)
)
else:
print(f"Got event time: {event_time}, it is after year 2022, so forwarding to after_year_2022")
messages.append(
sourcetransformer.Message(
value=val,
event_time=january_first_2023,
keys=keys,
tags=["after_year_2022"]
)
)

return messages


# Optional: ensure default signal handlers are in place so asyncio.run can handle them cleanly.
signal.signal(signal.SIGINT, signal.default_int_handler)
try:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
except AttributeError:
pass


async def start(f: callable):
server = sourcetransformer.SourceTransformAsyncServer()

# Register loop-level signal handlers so we control shutdown and avoid asyncio.run
# converting it into KeyboardInterrupt/CancelledError traces.
loop = asyncio.get_running_loop()
loop.set_debug(True)
print("Registering signal handlers", loop)
try:
loop.add_signal_handler(signal.SIGINT, lambda: server.stop())
loop.add_signal_handler(signal.SIGTERM, lambda: server.stop())
except (NotImplementedError, RuntimeError):
print("Failed to register signal handlers")
# add_signal_handler may not be available on some platforms/contexts; fallback below.
pass

try:
await server.start(f)
print("Shutting down gracefully...")
except asyncio.CancelledError:
# Fallback in case the task was cancelled by the runner
try:
server.stop()
except Exception:
pass
return


if __name__ == "__main__":
async_handler = EventFilter()
asyncio.run(start(async_handler))