Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add support for optional error handling callback for cases like malfo…
Browse files Browse the repository at this point in the history
…rmed or unexpected responses
idatsy committed Sep 24, 2024
1 parent de1eaa7 commit c495a1b
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions py_questdb/db.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Module for interacting with QuestDB."""

from types import TracebackType
from typing import Iterable, Type, overload, AsyncGenerator, Any, TypeVar
from typing import Iterable, Type, overload, AsyncGenerator, Any, TypeVar, Callable

import aiohttp
import pandas as pd
@@ -156,18 +156,27 @@ async def query(self, query_string: str) -> AsyncGenerator[dict[str, Any], None]
@overload
async def query(self, query_string: str, into_type: Type[T]) -> AsyncGenerator[T, None]: ...

async def query(self, query_string: str, into_type: Type[T] | None = None) -> AsyncGenerator[T, None]:
async def query(
self, query_string: str, into_type: Type[T] | None = None, error_handler: Callable[[bytes], None] | None = None
) -> AsyncGenerator[T, None]:
"""
Perform an asynchronous query and yield the results.
Args:
query_string: The SQL query string to execute.
into_type: Optional type to cast each row into.
error_handler: Optional function to handle errors in the response.
Yields:
Parsed rows as either dictionaries or instances of the specified type.
"""
data = self.parse_query_response(await self._query(query_string))
query_response = await self._query(query_string)
try:
data = self.parse_query_response(query_response)
except Exception as e:
if error_handler:
error_handler(query_response)
raise e

for row in self.parse_and_yield_query_response(data, into_type):
yield row
@@ -178,18 +187,26 @@ def query_sync(self, query_string: str) -> Iterable[dict[str, Any]]: ...
@overload
def query_sync(self, query_string: str, into_type: Type[T]) -> Iterable[T]: ...

def query_sync(self, query_string: str, into_type: Type[T] | None = None) -> Iterable[T] | Iterable[dict]:
def query_sync(
self, query_string: str, into_type: Type[T] | None = None, error_handler: Callable[[bytes], None] | None = None
) -> Iterable[T] | Iterable[dict]:
"""
Perform a synchronous query and return an iterable of results.
Args:
query_string: The SQL query string to execute.
into_type: Optional type to cast each row into.
error_handler: Optional function to handle errors in the response.
Returns:
An iterable of parsed rows as either dictionaries or instances of the specified type.
"""
data = self.parse_query_response(self._query_sync(query_string))
query_response = self._query_sync(query_string)
try:
data = self.parse_query_response(query_response)
except Exception as e:
if error_handler:
error_handler(query_response)
raise e

return self.parse_and_yield_query_response(data, into_type)

0 comments on commit c495a1b

Please sign in to comment.