Skip to content

Commit ec75a34

Browse files
Merge pull request #660 from aiven/giuseppelillo/fix-producer-error
fix: Correctly raise producer exceptions
2 parents b13553a + 86fac38 commit ec75a34

File tree

2 files changed

+61
-22
lines changed

2 files changed

+61
-22
lines changed

karapace/backup/api.py

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from kafka.admin import KafkaAdminClient, NewTopic
2828
from kafka.consumer.fetcher import ConsumerRecord
2929
from kafka.errors import KafkaError, TopicAlreadyExistsError
30-
from kafka.future import Future
3130
from kafka.structs import PartitionMetadata, TopicPartition
3231
from karapace import constants
3332
from karapace.backup.backends.v1 import SchemaBackupV1Reader
@@ -40,7 +39,7 @@
4039
from pathlib import Path
4140
from rich.console import Console
4241
from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed
43-
from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, NoReturn, TypeVar
42+
from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, TypeVar
4443

4544
import contextlib
4645
import datetime
@@ -263,16 +262,6 @@ def _consumer(config: Config, topic: str) -> Iterator[KafkaConsumer]:
263262
yield consumer
264263

265264

266-
@contextlib.contextmanager
267-
def _enable_producer_callback_errors() -> Iterator[None]:
268-
global_value = Future.error_on_callbacks
269-
Future.error_on_callbacks = True
270-
try:
271-
yield None
272-
finally:
273-
Future.error_on_callbacks = global_value
274-
275-
276265
@contextlib.contextmanager
277266
def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]:
278267
"""Creates an automatically closing Kafka producer client.
@@ -282,10 +271,9 @@ def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]:
282271
:raises PartitionCountError: if the topic does not have exactly one partition.
283272
:raises Exception: if client creation fails, concrete exception types are unknown, see Kafka implementation.
284273
"""
285-
with _enable_producer_callback_errors():
286-
with kafka_producer_from_config(config) as producer:
287-
__check_partition_count(topic, producer.partitions_for)
288-
yield producer
274+
with kafka_producer_from_config(config) as producer:
275+
__check_partition_count(topic, producer.partitions_for)
276+
yield producer
289277

290278

291279
def _normalize_location(input_location: str) -> Path | StdOut:
@@ -390,13 +378,10 @@ def _handle_restore_topic(
390378
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")
391379

392380

393-
def _raise_backup_error(exception: Exception) -> NoReturn:
394-
raise BackupDataRestorationError("Error while producing restored messages") from exception
395-
396-
397381
def _handle_producer_send(
398382
instruction: ProducerSend,
399383
producer: KafkaProducer,
384+
producer_error_callback: Callable[[Exception], None],
400385
) -> None:
401386
LOG.debug(
402387
"Sending kafka msg key: %r, value: %r",
@@ -411,7 +396,7 @@ def _handle_producer_send(
411396
partition=instruction.partition_index,
412397
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
413398
timestamp_ms=instruction.timestamp,
414-
).add_errback(_raise_backup_error)
399+
).add_errback(producer_error_callback)
415400
except (KafkaError, AssertionError) as ex:
416401
raise BackupDataRestorationError("Error while calling send on restoring messages") from ex
417402

@@ -446,11 +431,23 @@ def restore_backup(
446431
LOG.info("Identified backup backend: %s", backend.__class__.__name__)
447432
LOG.info("Starting backup restore for topic: %r", topic_name)
448433

434+
# Stores the latest exception raised by the error callback set on producer.send()
435+
_producer_exception = None
436+
449437
# We set up an ExitStack context, so that we can enter the producer context only
450438
# after processing a RestoreTopic instruction.
451439
with contextlib.ExitStack() as stack:
452440
producer = None
453441

442+
def _producer_error_callback(exception: Exception) -> None:
443+
LOG.error("Producer error", exc_info=exception)
444+
nonlocal _producer_exception
445+
_producer_exception = exception
446+
447+
def _check_producer_exception() -> None:
448+
if _producer_exception is not None:
449+
raise BackupDataRestorationError("Error while producing restored messages") from _producer_exception
450+
454451
for instruction in backend.read(backup_location, topic_name):
455452
if isinstance(instruction, RestoreTopicLegacy):
456453
_handle_restore_topic_legacy(instruction, config, skip_topic_creation)
@@ -461,10 +458,20 @@ def restore_backup(
461458
elif isinstance(instruction, ProducerSend):
462459
if producer is None:
463460
raise RuntimeError("Backend has not yet sent RestoreTopic.")
464-
_handle_producer_send(instruction, producer)
461+
_handle_producer_send(instruction, producer, _producer_error_callback)
462+
# Immediately check if producer.send() generated an exception. This call is
463+
# only an optimization, as producing is asynchronous and no sends might
464+
# have been executed once we reach this line.
465+
_check_producer_exception()
465466
else:
466467
assert_never(instruction)
467468

469+
# Check if an exception was raised after the producer was flushed and closed
470+
# by `kafka_producer_from_config` context manager. As opposed to the previous
471+
# call, this one is essential for correct behavior, as when we reach this point the
472+
# producer can no longer be sending messages (it has been flushed and closed).
473+
_check_producer_exception()
474+
468475

469476
def create_backup(
470477
config: Config,

tests/integration/backup/test_v3_backup.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,38 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
620620
)
621621

622622

623+
def test_producer_raises_exceptions(
624+
admin_client: KafkaAdminClient,
625+
kafka_servers: KafkaServers,
626+
) -> None:
627+
topic_name = "596ddf6b"
628+
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / topic_name
629+
metadata_path = backup_directory / f"{topic_name}.metadata"
630+
631+
# Make sure topic doesn't exist beforehand.
632+
try:
633+
admin_client.delete_topics([topic_name])
634+
except UnknownTopicOrPartitionError:
635+
print("No previously existing topic.")
636+
else:
637+
print("Deleted topic from previous run.")
638+
639+
config = set_config_defaults(
640+
{
641+
"bootstrap_uri": kafka_servers.bootstrap_servers,
642+
}
643+
)
644+
645+
with patch("kafka.producer.record_accumulator.RecordAccumulator.append") as p:
646+
p.side_effect = UnknownTopicOrPartitionError()
647+
with pytest.raises(BackupDataRestorationError):
648+
api.restore_backup(
649+
config=config,
650+
backup_location=metadata_path,
651+
topic_name=TopicName(topic_name),
652+
)
653+
654+
623655
def no_color_env() -> dict[str, str]:
624656
env = os.environ.copy()
625657
try:

0 commit comments

Comments
 (0)