22from collections .abc import AsyncGenerator
33
44from types import TracebackType
5- from typing import Any , TypeVar
5+ from typing import Any , TypeAlias , TypeVar
66
77from http import HTTPStatus
88import json
2626
2727T = TypeVar ('T' )
2828
29+ # Type aliases for commonly used types
30+ JsonDict : TypeAlias = dict [str , Any ]
31+ EventCandidateList : TypeAlias = list [EventCandidate ]
32+ PreconditionList : TypeAlias = list [Precondition ]
33+ EventStream : TypeAlias = AsyncGenerator [Event , None ]
34+ EventTypeStream : TypeAlias = AsyncGenerator [EventType , None ]
35+ SubjectStream : TypeAlias = AsyncGenerator [str , None ]
36+
2937
3038class Client ():
3139 def __init__ (
@@ -51,6 +59,14 @@ async def __aexit__(
5159 def http_client (self ) -> HttpClient :
5260 return self .__http_client
5361
62+ def _validate_response (self , response : Response , error_message : str | None = None ) -> None :
63+ """Validate that response comes from EventSourcingDB and has OK status."""
64+ if not is_valid_server_header (response ):
65+ raise ServerError ("Server must be EventSourcingDB" )
66+ if response .status_code != HTTPStatus .OK :
67+ msg = error_message or f"Unexpected response status: { response } "
68+ raise ServerError (msg )
69+
5470 async def ping (self ) -> None :
5571 specversion_field = "specversion"
5672 type_field = "type"
@@ -83,18 +99,12 @@ async def verify_api_token(self) -> None:
8399 request_body = request_body ,
84100 )
85101 async with response :
86- if not is_valid_server_header (response ):
87- raise ServerError ("Server must be EventSourcingDB" )
88- if response .status_code != HTTPStatus .OK :
89- raise ServerError (
90- f'Failed to verify API token: { response } '
91- )
102+ self ._validate_response (response , f"Failed to verify API token: { response } " )
92103
93104 response_data = await response .body .read ()
94105 response_data = bytes .decode (response_data , encoding = 'utf-8' )
95106 response_json = json .loads (response_data )
96107
97- # pylint: disable=R2004
98108 if not isinstance (response_json , dict ) or 'type' not in response_json :
99109 raise ServerError ('Failed to parse response: {response}' )
100110
@@ -104,8 +114,8 @@ async def verify_api_token(self) -> None:
104114
105115 async def write_events (
106116 self ,
107- event_candidates : list [ EventCandidate ] ,
108- preconditions : list [ Precondition ] = None # type: ignore
117+ event_candidates : EventCandidateList ,
118+ preconditions : PreconditionList | None = None
109119 ) -> list [Event ]:
110120 if preconditions is None :
111121 preconditions = []
@@ -117,18 +127,12 @@ async def write_events(
117127 }
118128 )
119129
120- response : Response
121130 response = await self .http_client .post (
122131 path = '/api/v1/write-events' ,
123132 request_body = request_body ,
124133 )
125134
126- if not is_valid_server_header (response ):
127- raise ServerError ("Server must be EventSourcingDB" )
128- if response .status_code != HTTPStatus .OK :
129- raise ServerError (
130- f'Unexpected response status: { response } '
131- )
135+ self ._validate_response (response )
132136
133137 response_data = await response .body .read ()
134138 response_data = bytes .decode (response_data , encoding = 'utf-8' )
@@ -147,7 +151,7 @@ async def read_events(
147151 self ,
148152 subject : str ,
149153 options : ReadEventsOptions
150- ) -> AsyncGenerator [ Event ] :
154+ ) -> EventStream :
151155 request_body = json .dumps ({
152156 'subject' : subject ,
153157 'options' : options .to_json ()
@@ -158,12 +162,7 @@ async def read_events(
158162 )
159163
160164 async with response :
161- if not is_valid_server_header (response ):
162- raise ServerError ("Server must be EventSourcingDB" )
163- if response .status_code != HTTPStatus .OK :
164- raise ServerError (
165- f'Unexpected response status: { response } '
166- )
165+ self ._validate_response (response )
167166 async for raw_message in response .body :
168167 message = parse_raw_message (raw_message )
169168
@@ -190,12 +189,7 @@ async def run_eventql_query(self, query: str) -> AsyncGenerator[Any]:
190189 )
191190
192191 async with response :
193- if not is_valid_server_header (response ):
194- raise ServerError ("Server must be EventSourcingDB" )
195- if response .status_code != HTTPStatus .OK :
196- raise ServerError (
197- f'Unexpected response status: { response } '
198- )
192+ self ._validate_response (response )
199193 async for raw_message in response .body :
200194 message = parse_raw_message (raw_message )
201195
@@ -205,7 +199,6 @@ async def run_eventql_query(self, query: str) -> AsyncGenerator[Any]:
205199 if is_stream_error (message ):
206200 error_message = message .get ('payload' , {}).get ('error' , 'Unknown error' )
207201 raise ServerError (f"{ error_message } ." )
208- # pylint: disable=R2004
209202 if message .get ('type' ) == 'row' :
210203 payload = message ['payload' ]
211204
@@ -221,7 +214,7 @@ async def observe_events(
221214 self ,
222215 subject : str ,
223216 options : ObserveEventsOptions
224- ) -> AsyncGenerator [ Event ] :
217+ ) -> EventStream :
225218 request_body = json .dumps ({
226219 'subject' : subject ,
227220 'options' : options .to_json ()
@@ -233,12 +226,7 @@ async def observe_events(
233226 )
234227
235228 async with response :
236- if not is_valid_server_header (response ):
237- raise ServerError ("Server must be EventSourcingDB" )
238- if response .status_code != HTTPStatus .OK :
239- raise ServerError (
240- f'Unexpected response status: { response } '
241- )
229+ self ._validate_response (response )
242230 async for raw_message in response .body :
243231 message = parse_raw_message (raw_message )
244232
@@ -258,7 +246,7 @@ async def observe_events(
258246 f'{ message } .'
259247 )
260248
261- async def register_event_schema (self , event_type : str , json_schema : dict ) -> None :
249+ async def register_event_schema (self , event_type : str , json_schema : JsonDict ) -> None :
262250 request_body = json .dumps ({
263251 'eventType' : event_type ,
264252 'schema' : json_schema ,
@@ -273,14 +261,14 @@ async def register_event_schema(self, event_type: str, json_schema: dict) -> Non
273261 if not is_valid_server_header (response ):
274262 raise ServerError ("Server must be EventSourcingDB" )
275263 if response .status_code != HTTPStatus .OK :
276- raise ServerError (
277- f'Unexpected response status: { response } '
278- )
264+ error_body = await response . body . read ()
265+ error_text = bytes . decode ( error_body , encoding = "utf-8" )
266+ raise ServerError ( error_text )
279267
280268 async def read_subjects (
281269 self ,
282270 base_subject : str
283- ) -> AsyncGenerator [ str ] :
271+ ) -> SubjectStream :
284272 request_body = json .dumps ({
285273 'baseSubject' : base_subject
286274 })
@@ -291,12 +279,7 @@ async def read_subjects(
291279 )
292280
293281 async with response :
294- if not is_valid_server_header (response ):
295- raise ServerError ("Server must be EventSourcingDB" )
296- if response .status_code != HTTPStatus .OK :
297- raise ServerError (
298- f'Unexpected response status: { response } '
299- )
282+ self ._validate_response (response )
300283 async for raw_message in response .body :
301284 message = parse_raw_message (raw_message )
302285
@@ -325,13 +308,13 @@ async def read_event_type(self, event_type: str) -> EventType:
325308 async with response :
326309 if not is_valid_server_header (response ):
327310 raise ServerError ("Server must be EventSourcingDB" )
328- if response .status_code != HTTPStatus .OK :
329- raise ServerError (
330- f'Unexpected response status: { response } '
331- )
332311
333312 response_data = await response .body .read ()
334313 response_data = bytes .decode (response_data , encoding = 'utf-8' )
314+
315+ if response .status_code != HTTPStatus .OK :
316+ raise ServerError (response_data )
317+
335318 response_json = json .loads (response_data )
336319
337320 if not isinstance (response_json , dict ):
@@ -346,25 +329,19 @@ async def read_event_type(self, event_type: str) -> EventType:
346329 raise InternalError (str (other_error )) from other_error
347330
348331
349- async def read_event_types (self ) -> AsyncGenerator [EventType ]:
350- response : Response
332+ async def read_event_types (self ) -> EventTypeStream :
351333 try :
352334 response = await self .http_client .post (
353335 path = '/api/v1/read-event-types' ,
354336 request_body = '' ,
355337 )
356- except CustomError as custom_error :
357- raise custom_error
358- except Exception as other_error :
359- raise InternalError (str (other_error )) from other_error
338+ except CustomError :
339+ raise
340+ except Exception as error :
341+ raise InternalError (str (error )) from error
360342
361343 async with response :
362- if not is_valid_server_header (response ):
363- raise ServerError ("Server must be EventSourcingDB" )
364- if response .status_code != HTTPStatus .OK :
365- raise ServerError (
366- f'Unexpected response status: { response } '
367- )
344+ self ._validate_response (response )
368345 async for raw_message in response .body :
369346 message = parse_raw_message (raw_message )
370347
0 commit comments