@@ -230,3 +230,63 @@ Custom codecs can be created using ``ICodec``:
230
230
decoder = BinaryDecoder(bytes_reader)
231
231
reader = DatumReader(self.schema)
232
232
return cast(object, reader.read(decoder))
233
+
234
+ Endpoint
235
+ --------
236
+
237
+ To share streaming data using API endpoints, install ``fastapi ``.
238
+
239
+ ::
240
+
241
+ from asyncio import gather, run, sleep
242
+ from time import strftime
243
+
244
+ from fastapi import FastAPI
245
+ from fastapi.responses import StreamingResponse
246
+ from uvicorn import Config, Server
247
+
248
+ from slipstream import Cache, handle, stream
249
+
250
+ app, cache = FastAPI(), Cache('db')
251
+
252
+ async def timer(interval=1.0):
253
+ while True:
254
+ yield
255
+ await sleep(interval)
256
+
257
+ async def cache_value_updates():
258
+ async for _, v in cache.__aiter__():
259
+ yield v + '\n'
260
+
261
+ @handle(timer(), sink=[cache, print])
262
+ def tick_tock():
263
+ yield 'time', strftime('%H:%M:%S')
264
+
265
+ @app.get('/updates')
266
+ async def updates():
267
+ return StreamingResponse(
268
+ cache_value_updates(),
269
+ media_type='text/event-stream'
270
+ )
271
+
272
+ async def main():
273
+ config = Config(app=app, host='0.0.0.0', port=8000)
274
+ server = Server(config)
275
+ await gather(stream(), server.serve())
276
+
277
+ if __name__ == '__main__':
278
+ run(main())
279
+
280
+ When we call the following url ``http://127.0.0.1:8000/updates `` it will stream the cache updates:
281
+
282
+ ::
283
+
284
+ curl -N http://127.0.0.1:8000/updates
285
+
286
+ ::
287
+
288
+ 00:16:57
289
+ 00:16:58
290
+ 00:16:59
291
+ 00:17:00
292
+ ...
0 commit comments