From c495a1b95151fc376b45f16dc1bf33ce4c655b64 Mon Sep 17 00:00:00 2001 From: idatsy Date: Tue, 24 Sep 2024 13:59:34 +0100 Subject: [PATCH] add support for optional error handling callback for cases like malformed or unexpected responses --- py_questdb/db.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/py_questdb/db.py b/py_questdb/db.py index 1584969..8def55a 100644 --- a/py_questdb/db.py +++ b/py_questdb/db.py @@ -1,7 +1,7 @@ """Module for interacting with QuestDB.""" from types import TracebackType -from typing import Iterable, Type, overload, AsyncGenerator, Any, TypeVar +from typing import Iterable, Type, overload, AsyncGenerator, Any, TypeVar, Callable import aiohttp import pandas as pd @@ -156,18 +156,27 @@ async def query(self, query_string: str) -> AsyncGenerator[dict[str, Any], None] @overload async def query(self, query_string: str, into_type: Type[T]) -> AsyncGenerator[T, None]: ... - async def query(self, query_string: str, into_type: Type[T] | None = None) -> AsyncGenerator[T, None]: + async def query( + self, query_string: str, into_type: Type[T] | None = None, error_handler: Callable[[bytes], None] | None = None + ) -> AsyncGenerator[T, None]: """ Perform an asynchronous query and yield the results. Args: query_string: The SQL query string to execute. into_type: Optional type to cast each row into. + error_handler: Optional function to handle errors in the response. Yields: Parsed rows as either dictionaries or instances of the specified type. """ - data = self.parse_query_response(await self._query(query_string)) + query_response = await self._query(query_string) + try: + data = self.parse_query_response(query_response) + except Exception as e: + if error_handler: + error_handler(query_response) + raise e for row in self.parse_and_yield_query_response(data, into_type): yield row @@ -178,18 +187,26 @@ def query_sync(self, query_string: str) -> Iterable[dict[str, Any]]: ... @overload def query_sync(self, query_string: str, into_type: Type[T]) -> Iterable[T]: ... - def query_sync(self, query_string: str, into_type: Type[T] | None = None) -> Iterable[T] | Iterable[dict]: + def query_sync( + self, query_string: str, into_type: Type[T] | None = None, error_handler: Callable[[bytes], None] | None = None + ) -> Iterable[T] | Iterable[dict]: """ Perform a synchronous query and return an iterable of results. Args: query_string: The SQL query string to execute. into_type: Optional type to cast each row into. - + error_handler: Optional function to handle errors in the response. Returns: An iterable of parsed rows as either dictionaries or instances of the specified type. """ - data = self.parse_query_response(self._query_sync(query_string)) + query_response = self._query_sync(query_string) + try: + data = self.parse_query_response(query_response) + except Exception as e: + if error_handler: + error_handler(query_response) + raise e return self.parse_and_yield_query_response(data, into_type)