diff --git a/not_my_board/_hub.py b/not_my_board/_hub.py index 98f70d7..d2f7546 100644 --- a/not_my_board/_hub.py +++ b/not_my_board/_hub.py @@ -30,39 +30,22 @@ async def asgi_app(request): return elif isinstance(request, asgineer.HttpRequest): if request.path == "/api/v1/places": - return {"places": [p.desc for p in Place.all()]} + return await _hub.get_places() return 404, {}, "Page not found" async def _handle_agent(ws): await _authorize_ws(ws) client_ip = ws.scope["client"][0] - async with Place.reservation_context(client_ip) as ctx: - api = AgentAPI(ctx) - server = jsonrpc.Server(ws.send, ws.receive_iter(), api) - await server.serve_forever() - - -class AgentAPI: - def __init__(self, reservation_context): - self._reservation_context = reservation_context - - async def reserve(self, candidate_ids): - place = await Place.reserve(candidate_ids, self._reservation_context) - return place.desc["id"] - - async def return_reservation(self, place_id): - await Place.return_by_id(place_id, self._reservation_context) + server = jsonrpc.Server(ws.send, ws.receive_iter()) + await _hub.agent_communicate(client_ip, server) async def _handle_exporter(ws): await _authorize_ws(ws) client_ip = ws.scope["client"][0] exporter = jsonrpc.Proxy(ws.send, ws.receive_iter()) - async with util.background_task(exporter.io_loop()) as io_loop: - place = await exporter.get_place() - with Place.register(place, exporter, client_ip): - await io_loop + await _hub.exporter_communicate(client_ip, exporter) async def _authorize_ws(ws): @@ -81,6 +64,38 @@ async def _authorize_ws(ws): await ws.accept() +class Hub: + async def get_places(self): + return {"places": [p.desc for p in Place.all()]} + + async def agent_communicate(self, client_ip, rpc): + async with Place.reservation_context(client_ip) as ctx: + api = AgentAPI(ctx) + rpc.set_api_object(api) + await rpc.serve_forever() + + async def exporter_communicate(self, client_ip, rpc): + async with util.background_task(rpc.io_loop()) as io_loop: + place = await rpc.get_place() + with Place.register(place, rpc, client_ip): + await io_loop + + +_hub = Hub() + + +class AgentAPI: + def __init__(self, reservation_context): + self._reservation_context = reservation_context + + async def reserve(self, candidate_ids): + place = await Place.reserve(candidate_ids, self._reservation_context) + return place.desc["id"] + + async def return_reservation(self, place_id): + await Place.return_by_id(place_id, self._reservation_context) + + class Place: _all_places = {} _next_id = 1 diff --git a/not_my_board/_jsonrpc.py b/not_my_board/_jsonrpc.py index bafc1a4..b79349f 100644 --- a/not_my_board/_jsonrpc.py +++ b/not_my_board/_jsonrpc.py @@ -30,7 +30,7 @@ def __init__(self, code, message, data): class Server: - def __init__(self, send, receive_iter, api_obj): + def __init__(self, send, receive_iter, api_obj=None): super().__init__() self._send = send self._receive_iter = receive_iter @@ -38,6 +38,9 @@ def __init__(self, send, receive_iter, api_obj): self._tasks = set() self._tasks_by_id = {} + def set_api_object(self, api_obj): + self._api_obj = api_obj + async def serve_forever(self): try: async for raw_data in self._receive_iter: