Skip to content

Commit 2d9f1e8

Browse files
committed
schema-reader: Wrap shutdown feature around flag
We'd like to allow the shutdown logic on corrutp schema be guarded by a feature flag so we do not surprise customers, this is disabled by default.
1 parent 1cdd0f1 commit 2d9f1e8

File tree

10 files changed

+187
-50
lines changed

10 files changed

+187
-50
lines changed

README.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,13 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
468468
* - ``master_election_strategy``
469469
- ``lowest``
470470
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
471+
* - ``kafka_schema_reader_strict_mode``
472+
- ``false``
473+
- If enabled, causes the Karapace schema-registry service to shutdown when there are invalid schema records in the `_schemas` topic
474+
* - ``kafka_retriable_errors_silenced``
475+
- ``true``
476+
- If enabled, kafka errors which can be retried or custom errors specififed for the service will not be raised,
477+
instead, a warning log is emitted. This will denoise issue tracking systems, i.e. sentry
471478

472479

473480
Authentication and authorization of Karapace Schema Registry REST API

container/compose.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ services:
8080
KARAPACE_COMPATIBILITY: FULL
8181
KARAPACE_STATSD_HOST: statsd-exporter
8282
KARAPACE_STATSD_PORT: 8125
83+
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
84+
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true
8385

8486
karapace-rest:
8587
image: ghcr.io/aiven-open/karapace:develop
@@ -106,6 +108,8 @@ services:
106108
KARAPACE_LOG_LEVEL: WARNING
107109
KARAPACE_STATSD_HOST: statsd-exporter
108110
KARAPACE_STATSD_PORT: 8125
111+
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
112+
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true
109113

110114
prometheus:
111115
image: prom/prometheus

container/prometheus/rules.yml

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ groups:
33
rules:
44
- record: karapace_exceptions_sum_by_exception
55
expr: sum by (exception) (exception)
6-
- alert: HighHTTPRequests
7-
expr: karapace_http_requests_total > 10
8-
for: 5m
6+
- alert: user_alert_karapace_high_non_success_http_requests
7+
expr: sum by (instance) (count_over_time(karapace_http_requests_total{status!~'^2.*'}[1m])) > 5
8+
for: 2m
99
labels:
1010
severity: warning
1111
annotations:
12-
summary: High HTTP requests for (instance={{ $labels.instance }})
13-
description: "Service received\n HTTP Requests = {{ $value }}\n"
14-
- alert: FireImmidiately
15-
expr: karapace_schema_reader_schemas > 1
12+
summary: High failing HTTP requests for (instance={{ $labels.instance }})
13+
description: "Service returned too many non-success HTTP responses = {{ $value }}\n"
14+
- alert: user_alert_karapace_frequent_restart
15+
expr: sum by (app)(count_over_time(karapace_shutdown_count[30s])) > 1
16+
for: 2m
1617
labels:
17-
severity: page
18+
severity: critical
1819
annotations:
19-
summary: Lots of schems on (instance={{ $labels.instance }})
20-
description: "\n Schema count = {{ $value }}\n"
20+
summary: Karapace service instance={{ $labels.instance }} restarting frequently.
21+
description: "Service is experiencing frequent restarts count={{ $value }}\n"

karapace/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ class Config(TypedDict):
8080
protobuf_runtime_directory: str
8181
statsd_host: str
8282
statsd_port: int
83+
kafka_schema_reader_strict_mode: bool
84+
kafka_retriable_errors_silenced: bool
8385

8486
sentry: NotRequired[Mapping[str, object]]
8587
tags: NotRequired[Mapping[str, object]]
@@ -154,6 +156,8 @@ class ConfigDefaults(Config, total=False):
154156
"protobuf_runtime_directory": "runtime",
155157
"statsd_host": "127.0.0.1",
156158
"statsd_port": 8125,
159+
"kafka_schema_reader_strict_mode": False,
160+
"kafka_retriable_errors_silenced": True,
157161
}
158162
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]
159163

karapace/coordinator/master_coordinator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ async def start(self) -> None:
5050
await self._kafka_client.bootstrap()
5151
break
5252
except KafkaConnectionError:
53-
LOG.exception("Kafka client bootstrap failed.")
53+
LOG.warning("Kafka client bootstrap failed.")
5454
await asyncio.sleep(0.5)
5555

5656
while not self._kafka_client.cluster.brokers():

karapace/kafka_error_handler.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""
2+
Copyright (c) 2024 Aiven Ltd
3+
See LICENSE for details
4+
"""
5+
6+
from karapace.config import Config
7+
from karapace.errors import CorruptKafkaRecordException
8+
from karapace.typing import StrEnum
9+
10+
import aiokafka.errors as Errors
11+
import enum
12+
import logging
13+
14+
LOG = logging.getLogger(__name__)
15+
16+
17+
class KafkaErrorLocation(StrEnum):
18+
SCHEMA_COORDINATOR = "SCHEMA_COORDINATOR"
19+
SCHEMA_READER = "SCHEMA_READER"
20+
21+
22+
class KafkaRetriableErrors(enum.Enum):
23+
SCHEMA_COORDINATOR = (Errors.NodeNotReadyError,)
24+
25+
26+
class KafkaErrorHandler:
27+
def __init__(self, config: Config) -> None:
28+
self.schema_reader_strict_mode: bool = config["kafka_schema_reader_strict_mode"]
29+
self.retriable_errors_silenced: bool = config["kafka_retriable_errors_silenced"]
30+
31+
def log(self, location: KafkaErrorLocation, error: BaseException) -> None:
32+
LOG.warning("%s encountered error - %s", location, error)
33+
34+
def handle_schema_coordinator_error(self, error: BaseException) -> None:
35+
if getattr(error, "retriable", False) or (
36+
error in KafkaRetriableErrors[KafkaErrorLocation.SCHEMA_COORDINATOR].value
37+
):
38+
self.log(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=error)
39+
if not self.retriable_errors_silenced:
40+
raise error
41+
42+
def handle_schema_reader_error(self, error: BaseException) -> None:
43+
if self.schema_reader_strict_mode:
44+
raise CorruptKafkaRecordException from error
45+
46+
def handle_error(self, location: KafkaErrorLocation, error: BaseException) -> None:
47+
return getattr(self, f"handle_{location.lower()}_error")(error=error)

karapace/schema_reader.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828
from karapace.config import Config
2929
from karapace.coordinator.master_coordinator import MasterCoordinator
3030
from karapace.dependency import Dependency
31-
from karapace.errors import CorruptKafkaRecordException, InvalidReferences, InvalidSchema, ShutdownException
31+
from karapace.errors import InvalidReferences, InvalidSchema, ShutdownException
3232
from karapace.in_memory_database import InMemoryDatabase
3333
from karapace.kafka.admin import KafkaAdminClient
3434
from karapace.kafka.common import translate_from_kafkaerror
3535
from karapace.kafka.consumer import KafkaConsumer
36+
from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation
3637
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
3738
from karapace.offset_watcher import OffsetWatcher
3839
from karapace.protobuf.exception import ProtobufException
@@ -141,6 +142,7 @@ def __init__(
141142
self.consumer: KafkaConsumer | None = None
142143
self._offset_watcher = offset_watcher
143144
self.stats = StatsClient(config=config)
145+
self.kafka_error_handler: KafkaErrorHandler = KafkaErrorHandler(config=config)
144146

145147
# Thread synchronization objects
146148
# - offset is used by the REST API to wait until this thread has
@@ -185,7 +187,8 @@ def run(self) -> None:
185187
LOG.warning("[Admin Client] No Brokers available yet. Retrying")
186188
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
187189
except KafkaConfigurationError:
188-
LOG.info("[Admin Client] Invalid configuration. Bailing")
190+
LOG.warning("[Admin Client] Invalid configuration. Bailing")
191+
self._stop_schema_reader.set()
189192
raise
190193
except Exception as e: # pylint: disable=broad-except
191194
LOG.exception("[Admin Client] Unexpected exception. Retrying")
@@ -202,9 +205,9 @@ def run(self) -> None:
202205
LOG.warning("[Consumer] No Brokers available yet. Retrying")
203206
self._stop_schema_reader.wait(timeout=KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS)
204207
except KafkaConfigurationError:
205-
LOG.info("[Consumer] Invalid configuration. Bailing")
208+
LOG.warning("[Consumer] Invalid configuration. Bailing")
206209
self._stop_schema_reader.set()
207-
shutdown()
210+
raise
208211
except Exception as e: # pylint: disable=broad-except
209212
LOG.exception("[Consumer] Unexpected exception. Retrying")
210213
self.stats.unexpected_exception(ex=e, where="consumer_instantiation")
@@ -249,7 +252,7 @@ def run(self) -> None:
249252
shutdown()
250253
except Exception as e: # pylint: disable=broad-except
251254
self.stats.unexpected_exception(ex=e, where="schema_reader_loop")
252-
LOG.exception("Unexpected exception in schema reader loop")
255+
LOG.warning("Unexpected exception in schema reader loop - %s", e)
253256

254257
def _get_beginning_offset(self) -> int:
255258
assert self.consumer is not None, "Thread must be started"
@@ -359,15 +362,19 @@ def handle_messages(self) -> None:
359362
key = json_decode(message_key)
360363
except JSONDecodeError as exc:
361364
LOG.warning("Invalid JSON in msg.key() at offset %s", msg.offset())
362-
raise CorruptKafkaRecordException from exc
365+
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
366+
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
367+
continue # [non-strict mode]
363368
except (GroupAuthorizationFailedError, TopicAuthorizationFailedError) as exc:
364369
LOG.error(
365370
"Kafka authorization error when consuming from %s: %s %s",
366371
self.config["topic_name"],
367372
exc,
368373
msg.error(),
369374
)
370-
raise ShutdownException from exc
375+
if self.kafka_error_handler.schema_reader_strict_mode:
376+
raise ShutdownException from exc
377+
continue
371378

372379
assert isinstance(key, dict)
373380
msg_keymode = KeyMode.CANONICAL if is_key_in_canonical_format(key) else KeyMode.DEPRECATED_KARAPACE
@@ -386,13 +393,17 @@ def handle_messages(self) -> None:
386393
value = self._parse_message_value(message_value)
387394
except (JSONDecodeError, TypeError) as exc:
388395
LOG.warning("Invalid JSON in msg.value() at offset %s", msg.offset())
389-
raise CorruptKafkaRecordException from exc
396+
self.offset = msg.offset() # Invalid entry shall also move the offset so Karapace makes progress.
397+
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
398+
continue # [non-strict mode]
390399

391400
try:
392401
self.handle_msg(key, value)
393-
self.offset = msg.offset()
394402
except (InvalidSchema, TypeError) as exc:
395-
raise CorruptKafkaRecordException from exc
403+
self.kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=exc)
404+
continue
405+
finally:
406+
self.offset = msg.offset()
396407

397408
if msg_keymode == KeyMode.CANONICAL:
398409
schema_records_processed_keymode_canonical += 1

tests/integration/backup/test_legacy_backup.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
Copyright (c) 2023 Aiven Ltd
55
See LICENSE for details
66
"""
7-
from aiohttp.client_exceptions import ClientError
87
from aiokafka.errors import InvalidTopicError
98
from karapace.backup import api
109
from karapace.backup.api import BackupVersion
@@ -237,14 +236,9 @@ async def test_backup_restore(
237236
topic_name=api.normalize_topic_name(None, config),
238237
)
239238
time.sleep(1.0)
240-
241-
# Restoring a `v1` backup with an invalid schema stops the service as expected, but I am
242-
# unsure why the logic mismatch, needs further investigation.
243-
if backup_file_version == "v1":
244-
with pytest.raises(ClientError):
245-
await registry_async_client.get(f"subjects/{subject}/versions")
246-
else:
247-
await registry_async_client.get(f"subjects/{subject}/versions")
239+
res = await registry_async_client.get(f"subjects/{subject}/versions")
240+
assert res.status_code == 200
241+
assert res.json() == [1]
248242

249243
_assert_canonical_key_format(
250244
bootstrap_servers=kafka_servers.bootstrap_servers, schemas_topic=registry_cluster.schemas_topic
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""
2+
Copyright (c) 2024 Aiven Ltd
3+
See LICENSE for details
4+
"""
5+
from _pytest.logging import LogCaptureFixture
6+
from karapace.errors import CorruptKafkaRecordException
7+
from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation
8+
9+
import aiokafka.errors as Errors
10+
import logging
11+
import pytest
12+
13+
14+
@pytest.fixture(name="kafka_error_handler")
15+
def fixture_kafka_error_handler() -> KafkaErrorHandler:
16+
config = {
17+
"kafka_schema_reader_strict_mode": False,
18+
"kafka_retriable_errors_silenced": True,
19+
}
20+
return KafkaErrorHandler(config=config)
21+
22+
23+
@pytest.mark.parametrize(
24+
"retriable_error",
25+
[
26+
Errors.NodeNotReadyError("node is still starting"),
27+
Errors.GroupCoordinatorNotAvailableError("group is unavailable"),
28+
Errors.NoBrokersAvailable("no brokers available"),
29+
],
30+
)
31+
def test_handle_error_retriable_schema_coordinator(
32+
caplog: LogCaptureFixture,
33+
kafka_error_handler: KafkaErrorHandler,
34+
retriable_error: Errors.KafkaError,
35+
):
36+
kafka_error_handler.retriable_errors_silenced = True
37+
with caplog.at_level(logging.WARNING, logger="karapace.error_handler"):
38+
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error)
39+
40+
for log in caplog.records:
41+
assert log.name == "karapace.kafka_error_handler"
42+
assert log.levelname == "WARNING"
43+
assert log.message == f"SCHEMA_COORDINATOR encountered error - {retriable_error}"
44+
45+
# Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour
46+
kafka_error_handler.retriable_errors_silenced = False
47+
with pytest.raises(retriable_error.__class__):
48+
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=retriable_error)
49+
50+
51+
@pytest.mark.parametrize(
52+
"nonretriable_error",
53+
[
54+
ValueError("value missing"),
55+
Errors.GroupAuthorizationFailedError("authorization failed"),
56+
Errors.InvalidCommitOffsetSizeError("invalid commit size"),
57+
],
58+
)
59+
def test_handle_error_nonretriable_schema_coordinator(
60+
kafka_error_handler: KafkaErrorHandler, nonretriable_error: BaseException
61+
) -> None:
62+
kafka_error_handler.retriable_errors_silenced = False
63+
with pytest.raises(nonretriable_error.__class__):
64+
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error)
65+
66+
# Check that the config flag - `kafka_retriable_errors_silenced` switches the behaviour
67+
kafka_error_handler.retriable_errors_silenced = True
68+
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_COORDINATOR, error=nonretriable_error)
69+
70+
71+
def test_handle_error_schema_reader(kafka_error_handler: KafkaErrorHandler) -> None:
72+
kafka_error_handler.schema_reader_strict_mode = True
73+
with pytest.raises(CorruptKafkaRecordException):
74+
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception)
75+
76+
# Check that the config flag - `kafka_schema_reader_strict_mode` switches the behaviour
77+
kafka_error_handler.schema_reader_strict_mode = False
78+
kafka_error_handler.handle_error(location=KafkaErrorLocation.SCHEMA_READER, error=Exception)

tests/unit/test_schema_reader.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from confluent_kafka import Message
1111
from dataclasses import dataclass
1212
from karapace.config import DEFAULTS
13-
from karapace.errors import CorruptKafkaRecordException
1413
from karapace.in_memory_database import InMemoryDatabase
1514
from karapace.kafka.consumer import KafkaConsumer
1615
from karapace.key_format import KeyFormatter
@@ -200,15 +199,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche
200199
consumer_mock = Mock(spec=KafkaConsumer)
201200

202201
schema_str = json.dumps(
203-
{
204-
"subject": "test",
205-
"version": 1,
206-
"id": 1,
207-
"deleted": False,
208-
"schema": json.dumps(
209-
{"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]}
210-
),
211-
}
202+
{"name": "init", "type": "record", "fields": [{"name": "inner", "type": ["string", "int"]}]}
212203
).encode()
213204

214205
ok1_message = Mock(spec=Message)
@@ -246,16 +237,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche
246237
schema_reader.handle_messages()
247238
assert schema_reader.offset == 1
248239
assert schema_reader.ready is False
249-
250-
with pytest.raises(CorruptKafkaRecordException):
251-
schema_reader.handle_messages()
252-
assert schema_reader.offset == 1
253-
assert schema_reader.ready is False
254-
255-
with pytest.raises(CorruptKafkaRecordException):
256-
schema_reader.handle_messages()
257-
assert schema_reader.offset == 1
258-
assert schema_reader.ready is False
240+
schema_reader.handle_messages()
241+
assert schema_reader.offset == 2
242+
assert schema_reader.ready is False
243+
schema_reader.handle_messages()
244+
assert schema_reader.offset == 3
245+
assert schema_reader.ready is False
246+
schema_reader.handle_messages() # call last time to call _is_ready()
247+
assert schema_reader.offset == 3
248+
assert schema_reader.ready is True
249+
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
259250

260251

261252
def test_soft_deleted_schema_storing() -> None:

0 commit comments

Comments
 (0)