Replies: 3 comments 2 replies
-
Picows at the moment provides a somewhat "low-level" interface and doesn't have "high-level" features like auto ping-pong mechanism. The easiest would be to create a task that sends exchange specific ping in a loop and check whether pong was received on the next iteration. This can be improved (if necessary) to take into account that the last message was received less than N seconds ago. But for detecting stale connection, imho, even this simple approach is good enough. Contrary to aiohttp and probably websockets (I haven't used it) you have full control over ping-pong exchange. You can log and troubleshoot disconnects more easily. It least this was the case for me. I sometimes didn't understand why aiohttp was reporting disconnects.
I haven't run or tested this code. |
Beta Was this translation helpful? Give feedback.
-
Thanks for your reply! I have a general idea of what to do now. I also want to ask you a question. For exchanges like OKX, we need to subscribe to multiple WebSocket connections at the same time. We need to implement a payload = json.dumps({
"op": "subscribe",
"args": params
}) As I understand it, I need to implement this Below is my implementation using websockets: class WebsocketManager(ABC):
def __init__(self, config: Dict[str, Any] = None, ping_interval: int = 5, ping_timeout: int = 5, close_timeout: int = 1, max_queue: int = 12):
self.queues: Dict[str, asyncio.Queue] = {}
self.tasks: List[asyncio.Task] = []
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.close_timeout = close_timeout
self.max_queue = max_queue
if config:
self.api_key = config.get("apiKey", None)
self.secret = config.get("secret", None)
self.password = config.get("password", None)
self.logger = log_register.get_logger(self.__class__.__name__, level="INFO", flush=True)
async def consume(self, queue_id: str, callback: Callable[..., Any] = None, *args, **kwargs):
while True:
msg = await self.queues[queue_id].get()
if asyncio.iscoroutinefunction(callback):
await callback(msg, *args, **kwargs)
else:
callback(msg, *args, **kwargs)
self.queues[queue_id].task_done()
@abstractmethod
async def _subscribe(self, symbol: str, typ: str, channel: str, queue_id: str):
pass
async def subscribe(self, symbols: List[str], typ: str, channel: str, callback: Callable[[Dict[str, Any]], None] = None, *args, **kwargs):
for symbol in symbols:
queue_id = f"{symbol}_{typ}_{channel}"
self.queues[queue_id] = asyncio.Queue()
self.tasks.append(asyncio.create_task(self.consume(queue_id, callback, *args, **kwargs)))
self.tasks.append(asyncio.create_task(self._subscribe(symbol, typ, channel, queue_id)))
async def close(self):
self.logger.info("Shutting down WebSocket connections...")
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
self.logger.info("All WebSocket connections closed.")
class OkxWebsocketManager(WebsocketManager):
def __init__(self,
config: Dict[str, Any] = None, ping_interval: int = 5,
ping_timeout: int = 5,
close_timeout: int = 1, max_queue: int = 12, demo_trade: bool = False):
super().__init__(config, ping_interval, ping_timeout, close_timeout, max_queue)
self.rate_limiter = Limiter(3/1)
self.demo_trade = demo_trade
async def _subscribe(self, symbol: str, typ: Literal["spot", "linear"], channel: Literal["books", "books5", "bbo-tbt", "trades"], queue_id: str):
"""
Subscribes to a specific symbol and channel on the exchange WebSocket.
Api documentation: https://www.okx.com/docs-v5/en/#order-book-trading-market-data-ws-order-book-channel
Args:
symbol (str): The trading symbol to subscribe to.
typ (Literal["spot", "linear"]): The type of trading (spot or linear).
channel (Literal["books", "books5", "bbo-tbt", "trades"]): The channel to subscribe to.
queue_id (str): The ID of the queue to store the received messages.
Returns:
None
"""
if typ == "spot":
s = symbol.replace('/USDT', '-USDT')
else:
s = symbol.replace('/USDT', '-USDT-SWAP')
params = [{
"channel": channel,
"instId": s
}]
while True:
try:
await self.rate_limiter.wait()
async with client.connect(
uri=MARKET_URLS["okx"]["demo"]["public"] if self.demo_trade else MARKET_URLS["okx"]["live"]["public"],
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
close_timeout=self.close_timeout,
max_queue=self.max_queue
) as ws:
self.logger.info(f"Connected to {symbol} for {queue_id}")
payload = json.dumps({
"op": "subscribe",
"args": params
})
await ws.send(payload)
async for msg in ws:
msg = orjson.loads(msg)
await self.queues[queue_id].put(msg)
except websockets.exceptions.ConnectionClosed as e:
self.logger.info(f"Connection closed for {queue_id}. Reconnecting...")
except asyncio.CancelledError:
self.logger.info(f"Cancelling watch task for {queue_id}")
break
except Exception as e:
self.logger.error(f"Error in watch for {queue_id}: {e}")
await asyncio.sleep(3)
With the subscribe method, I can create subscriptions for multiple symbols, where each symbol is an independent asyncio task. How can I achieve a similar functionality using Picows? I'm not very familiar with Picows, but its performance is indeed impressive. I would really appreciate your help! |
Beta Was this translation helpful? Give feedback.
-
If you want to reproduce the exactly same behavior that you have now with
I'm calling user_callback directly from OKXListener here, which is way more efficient for non-async callbacks than going through the queue. This also simplifies your code since you wouldn't need any |
Beta Was this translation helpful? Give feedback.
-
Some servers, like the OKX WebSocket server, require clients to send a
ping
message every 30 seconds:Similarly, the Bybit WebSocket server recommends sending a
ping
heartbeat every 20 seconds to prevent network or program issues.How can this be achieved using picows? I previously used
websockets
, which handled this automatically. I would really appreciate it if you could provide an example of how to implement this mechanism.Beta Was this translation helpful? Give feedback.
All reactions