Slipstream provides a data-flow model to simplify development of stateful streaming applications.
pip install slipstream-async
from asyncio import run
from slipstream import handle, stream
async def messages():
for emoji in '🏆📞🐟👌':
yield emoji
@handle(messages(), sink=[print])
def handle_message(msg):
yield f'Hello {msg}!'
if __name__ == '__main__':
run(stream())
Hello 🏆!
Hello 📞!
Hello 🐟!
Hello 👌!
Async iterables
are sources, (async) callables
are sinks.
Decorate handler functions using handle
, then run stream
to start processing:
Multiple sources and sinks can be provided to establish many-to-many relations between them.
The 4 emoji's were printed using the callable print
.
Install aiokafka
(latest) along with slipstream:
pip install slipstream-async[kafka]
Spin up a local Kafka broker with docker-compose.yml, using localhost:29091
to connect:
docker compose up broker -d
Follow the docs and set up a Kafka connection: slipstream.readthedocs.io.
slipstream.handle
: bind streams (iterables) and sinks (callables) to user defined handler functionsslipstream.stream
: start streamingslipstream.Topic
: consume from (iterable), and produce to (callable) kafka using aiokafkaslipstream.Cache
: store data to disk using rocksdictslipstream.Conf
: set global kafka configuration (can be overridden per topic)slipstream.codecs.JsonCodec
: serialize and deserialize json messages