Skip to content

Commit 117f635

Browse files
feat: use OTel in core metrics
1 parent 31df8bd commit 117f635

31 files changed

+308
-145
lines changed

src/karapace/__main__.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from karapace.api.container import SchemaRegistryContainer
77
from karapace.api.factory import create_karapace_application, karapace_schema_registry_lifespan
88
from karapace.api.telemetry.container import TelemetryContainer
9+
from karapace.core.auth_container import AuthContainer
910
from karapace.core.container import KarapaceContainer
1011

1112
import karapace.api.controller
@@ -18,48 +19,79 @@
1819
import karapace.api.routers.mode
1920
import karapace.api.routers.schemas
2021
import karapace.api.routers.subjects
21-
import karapace.api.telemetry.meter
22+
import karapace.core.instrumentation.meter
2223
import karapace.api.telemetry.middleware
2324
import karapace.api.telemetry.setup
2425
import karapace.api.telemetry.tracer
2526
import karapace.api.user
2627
import uvicorn
2728

29+
from karapace.core.metrics_container import MetricsContainer
30+
2831
if __name__ == "__main__":
2932
karapace_container = KarapaceContainer()
3033
karapace_container.wire(
3134
modules=[
3235
__name__,
33-
karapace.api.controller,
3436
karapace.api.telemetry.tracer,
35-
karapace.api.telemetry.meter,
37+
karapace.core.instrumentation.meter,
3638
]
3739
)
3840

39-
telemetry_container = TelemetryContainer(karapace_container=karapace_container)
40-
telemetry_container.wire(
41+
auth_container = AuthContainer(
42+
karapace_container=karapace_container,
43+
)
44+
auth_container.wire(
45+
modules=[
46+
karapace.api.controller,
47+
karapace.api.factory,
48+
karapace.api.routers.compatibility,
49+
karapace.api.routers.config,
50+
karapace.api.routers.mode,
51+
karapace.api.routers.schemas,
52+
karapace.api.routers.subjects,
53+
karapace.api.user,
54+
]
55+
)
56+
57+
metrics_container = MetricsContainer(
58+
karapace_container=karapace_container,
59+
)
60+
metrics_container.wire(
4161
modules=[
62+
karapace.api.factory,
4263
karapace.api.telemetry.setup,
64+
]
65+
)
66+
67+
telemetry_container = TelemetryContainer(
68+
karapace_container=karapace_container,
69+
metrics_container=metrics_container,
70+
)
71+
telemetry_container.wire(
72+
modules=[
4373
karapace.api.telemetry.middleware,
74+
karapace.api.telemetry.setup,
4475
]
4576
)
4677

4778
schema_registry_container = SchemaRegistryContainer(
48-
karapace_container=karapace_container, telemetry_container=telemetry_container
79+
karapace_container=karapace_container,
80+
metrics_container=metrics_container,
81+
telemetry_container=telemetry_container,
4982
)
5083
schema_registry_container.wire(
5184
modules=[
5285
__name__,
5386
karapace.api.factory,
54-
karapace.api.user,
87+
karapace.api.routers.compatibility,
88+
karapace.api.routers.config,
5589
karapace.api.routers.health,
90+
karapace.api.routers.master_availability,
5691
karapace.api.routers.metrics,
57-
karapace.api.routers.subjects,
58-
karapace.api.routers.schemas,
59-
karapace.api.routers.config,
60-
karapace.api.routers.compatibility,
6192
karapace.api.routers.mode,
62-
karapace.api.routers.master_availability,
93+
karapace.api.routers.schemas,
94+
karapace.api.routers.subjects,
6395
]
6496
)
6597

src/karapace/api/container.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,24 @@
77
from karapace.api.controller import KarapaceSchemaRegistryController
88
from karapace.api.telemetry.container import TelemetryContainer
99
from karapace.core.container import KarapaceContainer
10+
from karapace.core.metrics_container import MetricsContainer
1011
from karapace.core.schema_registry import KarapaceSchemaRegistry
1112

1213

1314
class SchemaRegistryContainer(containers.DeclarativeContainer):
1415
karapace_container = providers.Container(KarapaceContainer)
16+
metrics_container = providers.Container(MetricsContainer)
1517
telemetry_container = providers.Container(TelemetryContainer)
1618

17-
schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=karapace_container.config)
19+
schema_registry = providers.Singleton(
20+
KarapaceSchemaRegistry,
21+
config=karapace_container.config,
22+
stats=metrics_container.stats,
23+
)
1824

1925
schema_registry_controller = providers.Singleton(
2026
KarapaceSchemaRegistryController,
2127
config=karapace_container.config,
2228
schema_registry=schema_registry,
23-
stats=karapace_container.statsd,
29+
stats=metrics_container.stats,
2430
)

src/karapace/api/controller.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
SubjectVersion,
2626
)
2727
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
28+
from karapace.core.auth_container import AuthContainer
2829
from karapace.core.compatibility import CompatibilityModes
2930
from karapace.core.compatibility.jsonschema.checks import is_incompatible
3031
from karapace.core.compatibility.schema_compatibility import SchemaCompatibility
3132
from karapace.core.config import Config
32-
from karapace.core.container import KarapaceContainer
3333
from karapace.core.errors import (
3434
IncompatibleSchema,
3535
InvalidReferences,
@@ -57,7 +57,7 @@
5757
)
5858
from karapace.core.schema_references import LatestVersionReference, Reference
5959
from karapace.core.schema_registry import KarapaceSchemaRegistry
60-
from karapace.core.statsd import StatsClient
60+
from karapace.core.stats import StatsClient
6161
from karapace.core.typing import JsonData, JsonObject, SchemaId, Subject, Version
6262
from karapace.core.utils import JSONDecodeError
6363
from typing import Any, cast
@@ -71,8 +71,6 @@
7171

7272
class KarapaceSchemaRegistryController:
7373
def __init__(self, config: Config, schema_registry: KarapaceSchemaRegistry, stats: StatsClient) -> None:
74-
# super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve)
75-
7674
self.config = config
7775
self._process_start_time = time.monotonic()
7876
self.stats = stats
@@ -157,7 +155,7 @@ async def schemas_list(
157155
deleted: bool,
158156
latest_only: bool,
159157
user: User | None,
160-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
158+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
161159
) -> list[SchemaListingItem]:
162160
schemas = await self.schema_registry.schemas_list(include_deleted=deleted, latest_only=latest_only)
163161
response_schemas: list[SchemaListingItem] = []
@@ -190,7 +188,7 @@ async def schemas_get(
190188
include_subjects: bool,
191189
format_serialized: str,
192190
user: User | None,
193-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
191+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
194192
) -> SchemasResponse:
195193
try:
196194
parsed_schema_id = SchemaId(int(schema_id))
@@ -267,7 +265,7 @@ async def schemas_get_versions(
267265
schema_id: str,
268266
deleted: bool,
269267
user: User | None,
270-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
268+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
271269
) -> list[SubjectVersion]:
272270
try:
273271
schema_id_int = SchemaId(int(schema_id))
@@ -387,7 +385,7 @@ async def subjects_list(
387385
self,
388386
deleted: bool,
389387
user: User | None,
390-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]),
388+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
391389
) -> list[str]:
392390
subjects = [str(subject) for subject in self.schema_registry.database.find_subjects(include_deleted=deleted)]
393391
if authorizer:

src/karapace/api/factory.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
from karapace.api.routers.setup import setup_routers
1616
from karapace.api.telemetry.setup import setup_metering, setup_tracing
1717
from karapace.core.auth import AuthenticatorAndAuthorizer
18+
from karapace.core.auth_container import AuthContainer
1819
from karapace.core.config import Config
1920
from karapace.core.logging_setup import configure_logging, log_config_without_secrets
21+
from karapace.core.metrics_container import MetricsContainer
2022
from karapace.core.schema_registry import KarapaceSchemaRegistry
21-
from karapace.core.statsd import StatsClient
23+
from karapace.core.stats import StatsClient
2224
from typing import AsyncContextManager
2325

2426
import logging
@@ -29,20 +31,20 @@
2931
async def karapace_schema_registry_lifespan(
3032
_: FastAPI,
3133
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
32-
stastd: StatsClient = Depends(Provide[SchemaRegistryContainer.karapace_container.statsd]),
34+
stats: StatsClient = Depends(Provide[MetricsContainer.stats]),
3335
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
34-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
36+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
3537
) -> AsyncGenerator[None, None]:
3638
try:
3739
await schema_registry.start()
38-
await authorizer.start(stats=stastd)
40+
await authorizer.start(stats=stats)
3941

4042
yield
4143
finally:
4244
await schema_registry.close()
4345
await authorizer.close()
4446
await forward_client.close()
45-
stastd.close()
47+
stats.close()
4648

4749

4850
def create_karapace_application(

src/karapace/api/routers/compatibility.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from karapace.api.routers.requests import CompatibilityCheckResponse, SchemaRequest
1313
from karapace.api.user import get_current_user
1414
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
15+
from karapace.core.auth_container import AuthContainer
1516
from karapace.core.typing import Subject
1617
from typing import Annotated
1718
from urllib.parse import unquote_plus
@@ -31,7 +32,7 @@ async def compatibility_post(
3132
version: str, # TODO support actual Version object
3233
schema_request: SchemaRequest,
3334
user: Annotated[User, Depends(get_current_user)],
34-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
35+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
3536
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
3637
) -> CompatibilityCheckResponse:
3738
subject = Subject(unquote_plus(subject))

src/karapace/api/routers/config.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from karapace.api.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse
1414
from karapace.api.user import get_current_user
1515
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
16+
from karapace.core.auth_container import AuthContainer
1617
from karapace.core.schema_registry import KarapaceSchemaRegistry
1718
from karapace.core.typing import Subject
1819
from typing import Annotated
@@ -30,7 +31,7 @@
3031
@inject
3132
async def config_get(
3233
user: Annotated[User, Depends(get_current_user)],
33-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
34+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
3435
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
3536
) -> CompatibilityLevelResponse:
3637
if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"):
@@ -47,7 +48,7 @@ async def config_put(
4748
user: Annotated[User, Depends(get_current_user)],
4849
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
4950
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
50-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
51+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
5152
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
5253
) -> CompatibilityResponse:
5354
if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"):
@@ -69,7 +70,7 @@ async def config_get_subject(
6970
subject: Subject,
7071
user: Annotated[User, Depends(get_current_user)],
7172
defaultToGlobal: bool = False,
72-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
73+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
7374
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
7475
) -> CompatibilityLevelResponse:
7576
subject = Subject(unquote_plus(subject))
@@ -88,7 +89,7 @@ async def config_set_subject(
8889
user: Annotated[User, Depends(get_current_user)],
8990
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
9091
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
91-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
92+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
9293
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
9394
) -> CompatibilityResponse:
9495
subject = Subject(unquote_plus(subject))
@@ -113,7 +114,7 @@ async def config_delete_subject(
113114
user: Annotated[User, Depends(get_current_user)],
114115
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
115116
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
116-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
117+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
117118
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
118119
) -> CompatibilityResponse:
119120
subject = Subject(unquote_plus(subject))

src/karapace/api/routers/mode.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from karapace.api.routers.requests import ModeResponse
1313
from karapace.api.user import get_current_user
1414
from karapace.core.auth import AuthenticatorAndAuthorizer, Operation, User
15+
from karapace.core.auth_container import AuthContainer
1516
from karapace.core.typing import Subject
1617
from typing import Annotated
1718
from urllib.parse import unquote_plus
@@ -28,7 +29,7 @@
2829
@inject
2930
async def mode_get(
3031
user: Annotated[User, Depends(get_current_user)],
31-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
32+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
3233
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
3334
) -> ModeResponse:
3435
if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"):
@@ -42,7 +43,7 @@ async def mode_get(
4243
async def mode_get_subject(
4344
subject: Subject,
4445
user: Annotated[User, Depends(get_current_user)],
45-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
46+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
4647
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
4748
) -> ModeResponse:
4849
subject = Subject(unquote_plus(subject))

src/karapace/api/routers/schemas.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from karapace.core.auth import AuthenticatorAndAuthorizer, User
1313
from typing import Annotated
1414

15+
from karapace.core.auth_container import AuthContainer
16+
1517
schemas_router = APIRouter(
1618
prefix="/schemas",
1719
tags=["schemas"],
@@ -26,7 +28,7 @@ async def schemas_get_list(
2628
user: Annotated[User, Depends(get_current_user)],
2729
deleted: bool = False,
2830
latestOnly: bool = False,
29-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
31+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
3032
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
3133
) -> list[SchemaListingItem]:
3234
return await controller.schemas_list(
@@ -45,7 +47,7 @@ async def schemas_get(
4547
includeSubjects: bool = False, # TODO: include subjects?
4648
fetchMaxId: bool = False, # TODO: fetch max id?
4749
format_serialized: str = Query("", alias="format"),
48-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
50+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
4951
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
5052
) -> SchemasResponse:
5153
return await controller.schemas_get(
@@ -72,7 +74,7 @@ async def schemas_get_versions(
7274
user: Annotated[User, Depends(get_current_user)],
7375
schema_id: str,
7476
deleted: bool = False,
75-
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
77+
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[AuthContainer.authorizer]),
7678
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
7779
) -> list[SubjectVersion]:
7880
return await controller.schemas_get_versions(

0 commit comments

Comments
 (0)