-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathwebsocket_handler.py
More file actions
56 lines (43 loc) · 1.81 KB
/
websocket_handler.py
File metadata and controls
56 lines (43 loc) · 1.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
import json
from typing import Optional
from lnbits.settings import settings
from lnbits.tasks import create_permanent_unique_task
from loguru import logger
from websockets.client import connect
from .crud import get_or_create_satspay_settings
ws_receive_queue: asyncio.Queue[dict] = asyncio.Queue()
ws_send_queue: asyncio.Queue[dict] = asyncio.Queue()
websocket_task: Optional[asyncio.Task] = None
def restart_websocket_task():
logger.info("Restarting websocket task...")
global websocket_task
if websocket_task:
websocket_task.cancel()
websocket_task = create_permanent_unique_task(
"ext_satspay_websocket", websocket_handler
)
async def consumer_handler(websocket):
async for message in websocket:
logger.debug(f"Received message: {message[:69]}...")
ws_receive_queue.put_nowait(json.loads(message))
async def producer_handler(websocket):
while settings.lnbits_running:
message = json.dumps(await ws_send_queue.get())
logger.debug(f"Send message: {message[:69]}...")
await websocket.send(message)
async def websocket_handler():
satspay_settings = await get_or_create_satspay_settings()
uri = f"{satspay_settings.mempool_url}/api/v1/ws".replace("http", "ws")
logger.info(f"websocket_handler: Connecting to {uri}...")
async with connect(uri) as websocket:
logger.info(f"websocket_handler: Connected to {uri}")
consumer_task = asyncio.create_task(consumer_handler(websocket))
producer_task = asyncio.create_task(producer_handler(websocket))
_, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
raise Exception("websocket_handler unexpectedly finished")