This repository has been archived by the owner on Jun 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_client.py
120 lines (93 loc) · 5.16 KB
/
test_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
""" Test Client for the GraphQL app
This client offers greater flexibility than the FastAPI GraphQL client because:
* It involves fewer application layers and works faster
* You can customize the context before executing any operations
* Errors are reported as Exceptions (rather than JSON objects) and can be analyzed
* Supports subscriptions :)
"""
from __future__ import annotations
import asyncio
import graphql
from contextlib import asynccontextmanager, contextmanager
from typing import Any, Optional
from collections import abc
from apiens.tools.python.threadpool import run_in_threadpool
from .query import GraphQLResult, ContextT, graphql_query_async, graphql_query_sync
class GraphQLTestClient:
def __init__(self, schema: graphql.GraphQLSchema, debug: bool = True):
self.schema = schema
self.debug = debug
# Execute queries
def execute(self, query: str, /, **variables) -> GraphQLResult[ContextT]:
""" Execute a GraphQL query, in sync mode, but with async resolver support """
with self.init_context_sync() as context_value:
return self.execute_operation(query, context_value, **variables)
async def execute_async(self, query: str, /, **variables) -> GraphQLResult[ContextT]:
""" Execute a GraphQL query in async mode """
async with self.init_context_async() as context_value:
return await self.execute_async_operation(query, context_value, **variables)
def execute_sync(self, query: str, /, **variables) -> GraphQLResult[ContextT]:
""" Execute a GraphQL query in sync mode """
with self.init_context_sync() as context_value:
return self.execute_sync_operation(query, context_value, **variables)
async def subscribe(self, query: str, /, **variables) -> abc.AsyncIterator[GraphQLResult[ContextT]]:
""" Execute a GraphQL subscription, in async mode """
async with self.init_context_async() as context_value:
sub: abc.AsyncIterator[GraphQLResult[ContextT]] = self.execute_subscription(query, context_value, **variables) # type: ignore[assignment]
async for res in sub:
yield res
def __enter__(self):
return self
def __exit__(self, *exc):
pass
# Prepare context
@contextmanager
def init_context_sync(self) -> abc.Iterator[Optional[ContextT]]:
""" Prepare a context for a GraphQL request """
yield None
@asynccontextmanager
async def init_context_async(self) -> abc.AsyncIterator[Optional[ContextT]]:
""" Prepare a context for a GraphQL request, async mode. Used for GraphQL subscriptions.
Override this method if your app uses a different way to prepare context in async tests
"""
# Simply run the same function, but using threads
cm = self.init_context_sync()
value = await run_in_threadpool(next, cm.gen) # type: ignore[arg-type]
try:
yield value
finally:
await run_in_threadpool(next, cm.gen, None) # type: ignore[call-arg, arg-type]
#region: Execute queries
def execute_operation(self, query: str, context_value: Any = None, /, operation_name: str = None, **variables) -> GraphQLResult[ContextT]:
""" Execute a GraphQL operation on the schema, with a custom context, in sync mode
Async mode supports async resolvers. Blocking sync resolvers must be careful to run themselves in threadpool.
See `apiens.tools.graphql.resolver.resolver_marker` to aid with this.
"""
return asyncio.run(
# We run it through the async so that your async methods can run in unit-tests.
self.execute_async_operation(query, context_value, operation_name=operation_name, **variables)
)
async def execute_async_operation(self, query: str, context_value: Any = None, /, operation_name: str = None, **variables) -> GraphQLResult[ContextT]:
""" Execute a GraphQL operation on the schema, async """
return await graphql_query_async(self.schema, query, context_value, operation_name, **variables)
def execute_sync_operation(self, query: str, context_value: Any = None, /, operation_name: str = None, **variables) -> GraphQLResult[ContextT]:
""" Execute a GraphQL operation on the schema, with a custom context, in sync mode
Sync mode assumes that every resolver is a sync function
"""
return graphql_query_sync(self.schema, query, context_value, operation_name, **variables)
async def execute_subscription(self, query: str, context_value: Any = None, **variables) -> abc.AsyncIterator[Optional[dict[str, Any]]]:
""" Execute a GraphQL subscription """
document = graphql.parse(query)
gen = await graphql.subscribe(
self.schema,
document,
context_value=context_value,
variable_values=variables,
)
if isinstance(gen, graphql.ExecutionResult):
GraphQLResult(gen.formatted, context_value, exceptions=gen.errors).raise_errors() # type: ignore[arg-type]
else:
async for res in gen:
assert not res.errors
yield res.data
#endregion