Skip to content

Commit 1c81cd2

Browse files
authored
Merge pull request Aiven-Open#561 from aiven/jjaakola-aiven-extract-kafka-producing-out-from-registry-logic
refactor: extract Kafka producer from Registry
2 parents d956822 + 00aaa28 commit 1c81cd2

File tree

6 files changed

+207
-162
lines changed

6 files changed

+207
-162
lines changed

karapace/messaging.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
"""
2+
karapace - Karapace producer
3+
4+
Copyright (c) 2023 Aiven Ltd
5+
See LICENSE for details
6+
"""
7+
from kafka import KafkaProducer
8+
from kafka.errors import MessageSizeTooLargeError
9+
from karapace.config import Config
10+
from karapace.errors import SchemaTooLargeException
11+
from karapace.key_format import KeyFormatter
12+
from karapace.offset_watcher import OffsetWatcher
13+
from karapace.utils import json_encode, KarapaceKafkaClient
14+
from karapace.version import __version__
15+
from typing import Any, cast, Dict, Optional, Tuple, Union
16+
17+
import logging
18+
import time
19+
20+
LOG = logging.getLogger(__name__)
21+
X_REGISTRY_VERSION_HEADER = ("X-Registry-Version", f"karapace-{__version__}".encode())
22+
23+
24+
class KarapaceProducer:
25+
def __init__(self, *, config: Config, offset_watcher: OffsetWatcher, key_formatter: KeyFormatter):
26+
self._producer: Optional[KafkaProducer] = None
27+
self._config = config
28+
self._offset_watcher = offset_watcher
29+
self._key_formatter = key_formatter
30+
self._kafka_timeout = 10
31+
self._schemas_topic = self._config["topic_name"]
32+
33+
host: str = cast(str, self._config["host"])
34+
self.x_origin_host_header: Tuple[str, bytes] = ("X-Origin-Host", host.encode("utf8"))
35+
36+
def initialize_karapace_producer(
37+
self,
38+
) -> None:
39+
while True:
40+
try:
41+
self._producer = KafkaProducer(
42+
bootstrap_servers=self._config["bootstrap_uri"],
43+
security_protocol=self._config["security_protocol"],
44+
ssl_cafile=self._config["ssl_cafile"],
45+
ssl_certfile=self._config["ssl_certfile"],
46+
ssl_keyfile=self._config["ssl_keyfile"],
47+
sasl_mechanism=self._config["sasl_mechanism"],
48+
sasl_plain_username=self._config["sasl_plain_username"],
49+
sasl_plain_password=self._config["sasl_plain_password"],
50+
api_version=(1, 0, 0),
51+
metadata_max_age_ms=self._config["metadata_max_age_ms"],
52+
max_block_ms=2000, # missing topics will block unless we cache cluster metadata and pre-check
53+
connections_max_idle_ms=self._config["connections_max_idle_ms"], # helps through cluster upgrades ??
54+
kafka_client=KarapaceKafkaClient,
55+
)
56+
return
57+
except: # pylint: disable=bare-except
58+
LOG.exception("Unable to create producer, retrying")
59+
time.sleep(1)
60+
61+
def close(self) -> None:
62+
if self._producer is not None:
63+
self._producer.close()
64+
65+
def _send_kafka_message(self, key: Union[bytes, str], value: Union[bytes, str]) -> None:
66+
assert self._producer is not None
67+
68+
if isinstance(key, str):
69+
key = key.encode("utf8")
70+
if isinstance(value, str):
71+
value = value.encode("utf8")
72+
73+
future = self._producer.send(
74+
self._schemas_topic,
75+
key=key,
76+
value=value,
77+
headers=[X_REGISTRY_VERSION_HEADER, self.x_origin_host_header],
78+
)
79+
self._producer.flush(timeout=self._kafka_timeout)
80+
try:
81+
msg = future.get(self._kafka_timeout)
82+
except MessageSizeTooLargeError as ex:
83+
raise SchemaTooLargeException from ex
84+
85+
sent_offset = msg.offset
86+
87+
LOG.info(
88+
"Waiting for schema reader to caught up. key: %r, value: %r, offset: %r",
89+
key,
90+
value,
91+
sent_offset,
92+
)
93+
94+
if self._offset_watcher.wait_for_offset(sent_offset, timeout=60) is True:
95+
LOG.info(
96+
"Schema reader has found key. key: %r, value: %r, offset: %r",
97+
key,
98+
value,
99+
sent_offset,
100+
)
101+
else:
102+
raise RuntimeError(
103+
"Schema reader timed out while looking for key. key: {!r}, value: {!r}, offset: {}".format(
104+
key, value, sent_offset
105+
)
106+
)
107+
108+
def send_message(self, *, key: Dict[str, Any], value: Optional[Dict[str, Any]]) -> None:
109+
key_bytes = self._key_formatter.format_key(key)
110+
value_bytes: Union[bytes, str] = b""
111+
if value is not None:
112+
value_bytes = json_encode(value, binary=True, compact=True)
113+
self._send_kafka_message(key=key_bytes, value=value_bytes)

karapace/offset_watcher.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""
2+
karapace - Karapace offset watcher
3+
4+
Copyright (c) 2023 Aiven Ltd
5+
See LICENSE for details
6+
"""
7+
from threading import Condition
8+
9+
10+
class OffsetWatcher:
11+
"""Synchronization container for threads to wait until an offset is seen.
12+
13+
This works under the assumption offsets are used only once, which should be
14+
correct as long as no unclean leader election is performed.
15+
"""
16+
17+
def __init__(self) -> None:
18+
# Condition used to protected _greatest_offset, any modifications to that object must
19+
# be performed with this condition acquired
20+
self._condition = Condition()
21+
self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever.
22+
23+
def greatest_offset(self) -> int:
24+
return self._greatest_offset
25+
26+
def offset_seen(self, new_offset: int) -> None:
27+
with self._condition:
28+
self._greatest_offset = max(self._greatest_offset, new_offset)
29+
self._condition.notify_all()
30+
31+
def wait_for_offset(self, expected_offset: int, timeout: float) -> bool:
32+
"""Block until expected_offset is seen.
33+
34+
Args:
35+
expected_offset: The message offset generated by the producer.
36+
timeout: How long the caller will wait for the offset in seconds.
37+
"""
38+
with self._condition:
39+
return self._condition.wait_for(lambda: expected_offset <= self._greatest_offset, timeout=timeout)

karapace/schema_reader.py

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
from karapace.in_memory_database import InMemoryDatabase
2222
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
2323
from karapace.master_coordinator import MasterCoordinator
24+
from karapace.offset_watcher import OffsetWatcher
2425
from karapace.schema_models import SchemaType, TypedSchema
2526
from karapace.statsd import StatsClient
2627
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
27-
from threading import Condition, Event, Thread
28+
from threading import Event, Thread
2829
from typing import Optional
2930

3031
import logging
@@ -102,42 +103,11 @@ def new_schema_topic_from_config(config: Config) -> NewTopic:
102103
)
103104

104105

105-
class OffsetsWatcher:
106-
"""Synchronization container for threads to wait until an offset is seen.
107-
108-
This works under the assumption offsets are used only once, which should be
109-
correct as long as no unclean leader election is performed.
110-
"""
111-
112-
def __init__(self) -> None:
113-
# Condition used to protected _greatest_offset, any modifications to that object must
114-
# be performed with this condition acquired
115-
self._condition = Condition()
116-
self._greatest_offset = -1 # Would fail if initially this is 0 as it will be first offset ever.
117-
118-
def greatest_offset(self) -> int:
119-
return self._greatest_offset
120-
121-
def offset_seen(self, new_offset: int) -> None:
122-
with self._condition:
123-
self._greatest_offset = max(self._greatest_offset, new_offset)
124-
self._condition.notify_all()
125-
126-
def wait_for_offset(self, expected_offset: int, timeout: float) -> bool:
127-
"""Block until expected_offset is seen.
128-
129-
Args:
130-
expected_offset: The message offset generated by the producer.
131-
timeout: How long the caller will wait for the offset in seconds.
132-
"""
133-
with self._condition:
134-
return self._condition.wait_for(lambda: expected_offset <= self._greatest_offset, timeout=timeout)
135-
136-
137106
class KafkaSchemaReader(Thread):
138107
def __init__(
139108
self,
140109
config: Config,
110+
offset_watcher: OffsetWatcher,
141111
key_formatter: KeyFormatter,
142112
database: InMemoryDatabase,
143113
master_coordinator: Optional[MasterCoordinator] = None,
@@ -150,7 +120,7 @@ def __init__(
150120
self.admin_client: Optional[KafkaAdminClient] = None
151121
self.topic_replication_factor = self.config["replication_factor"]
152122
self.consumer: Optional[KafkaConsumer] = None
153-
self.offset_watcher = OffsetsWatcher()
123+
self._offset_watcher = offset_watcher
154124
self.stats = StatsClient(config=config)
155125

156126
# Thread synchronization objects
@@ -293,7 +263,7 @@ def _is_ready(self) -> bool:
293263
return self.offset >= self._highest_offset
294264

295265
def highest_offset(self) -> int:
296-
return max(self._highest_offset, self.offset_watcher.greatest_offset())
266+
return max(self._highest_offset, self._offset_watcher.greatest_offset())
297267

298268
def handle_messages(self) -> None:
299269
assert self.consumer is not None, "Thread must be started"
@@ -348,7 +318,7 @@ def handle_messages(self) -> None:
348318
schema_records_processed_keymode_deprecated_karapace += 1
349319

350320
if self.ready and watch_offsets:
351-
self.offset_watcher.offset_seen(self.offset)
321+
self._offset_watcher.offset_seen(self.offset)
352322

353323
self._report_schema_metrics(
354324
schema_records_processed_keymode_canonical,

0 commit comments

Comments
 (0)