Skip to content

Commit

Permalink
Fix errors raised by new version of Pylint so tests pass again
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha committed Jul 8, 2024
1 parent acce0e4 commit 7d1eb7e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
5 changes: 5 additions & 0 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
topics=topics,
allow_auto_topic_creation=auto_topic_creation
)
else:
raise IncompatibleBrokerVersion(f"MetadataRequest for {version} is not supported")

future = self._send_request_to_node(
self._client.least_loaded_node(),
Expand Down Expand Up @@ -1010,6 +1012,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id,
def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 3:
group_description = None
assert len(response.groups) == 1
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
if isinstance(response_field, Array):
Expand Down Expand Up @@ -1045,6 +1048,8 @@ def _describe_consumer_groups_process_response(self, response):
if response.API_VERSION <=2:
described_group_information_list.append(None)
group_description = GroupInformation._make(described_group_information_list)
if group_description is None:
raise Errors.BrokerResponseError("No group description received")
error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
Expand Down
8 changes: 5 additions & 3 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ def _maybe_uncompress(self) -> None:
data = memoryview(self._buffer)[self._pos:]
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
if compression_type == self.CODEC_SNAPPY:
elif compression_type == self.CODEC_SNAPPY:
uncompressed = snappy_decode(data.tobytes())
if compression_type == self.CODEC_LZ4:
elif compression_type == self.CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())
if compression_type == self.CODEC_ZSTD:
elif compression_type == self.CODEC_ZSTD:
uncompressed = zstd_decode(data.tobytes())
else:
raise NotImplementedError(f"Compression type {compression_type} is not supported")
self._buffer = bytearray(uncompressed)
self._pos = 0
self._decompressed = True
Expand Down
2 changes: 2 additions & 0 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ def _maybe_compress(self) -> bool:
compressed = lz4_encode_old_kafka(data)
else:
compressed = lz4_encode(data)
else:
raise NotImplementedError(f"Compression type {self._compression_type} is not supported")
size = self.size_in_bytes(
0, timestamp=0, key=None, value=compressed)
# We will try to reuse the same buffer if we have enough space
Expand Down

0 comments on commit 7d1eb7e

Please sign in to comment.