Skip to content

Commit

Permalink
Patch pylint warnings so tests pass again (#184)
Browse files Browse the repository at this point in the history
* stop pylint complaint for uncovered conditional flow

* add todo to revisit

* formatting makes me happy :)

* Fix errors raised by new version of Pylint so tests pass again
  • Loading branch information
wbarnha authored Jul 12, 2024
1 parent 448017e commit 5e461a7
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 3 deletions.
5 changes: 5 additions & 0 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
topics=topics,
allow_auto_topic_creation=auto_topic_creation
)
else:
raise IncompatibleBrokerVersion(f"MetadataRequest for {version} is not supported")

future = self._send_request_to_node(
self._client.least_loaded_node(),
Expand Down Expand Up @@ -1010,6 +1012,7 @@ def _describe_consumer_groups_send_request(self, group_id, group_coordinator_id,
def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 3:
group_description = None
assert len(response.groups) == 1
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
if isinstance(response_field, Array):
Expand Down Expand Up @@ -1045,6 +1048,8 @@ def _describe_consumer_groups_process_response(self, response):
if response.API_VERSION <=2:
described_group_information_list.append(None)
group_description = GroupInformation._make(described_group_information_list)
if group_description is None:
raise Errors.BrokerResponseError("No group description received")
error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
Expand Down
5 changes: 5 additions & 0 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,15 @@ def _send_offset_commit_request(self, offsets):
) for partition, offset in partitions.items()]
) for topic, partitions in offset_data.items()]
)
else:
# TODO: We really shouldn't need this here to begin with, but I'd like to get
# pylint to stop complaining.
raise Exception(f"Unsupported Broker API: {self.config['api_version']}")

log.debug("Sending offset-commit request with %s for group %s to %s",
offsets, self.group_id, node_id)


future = Future()
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_commit_response, offsets, future, time.time())
Expand Down
8 changes: 5 additions & 3 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ def _maybe_uncompress(self) -> None:
data = memoryview(self._buffer)[self._pos:]
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
if compression_type == self.CODEC_SNAPPY:
elif compression_type == self.CODEC_SNAPPY:
uncompressed = snappy_decode(data.tobytes())
if compression_type == self.CODEC_LZ4:
elif compression_type == self.CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())
if compression_type == self.CODEC_ZSTD:
elif compression_type == self.CODEC_ZSTD:
uncompressed = zstd_decode(data.tobytes())
else:
raise NotImplementedError(f"Compression type {compression_type} is not supported")
self._buffer = bytearray(uncompressed)
self._pos = 0
self._decompressed = True
Expand Down
2 changes: 2 additions & 0 deletions kafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ def _maybe_compress(self) -> bool:
compressed = lz4_encode_old_kafka(data)
else:
compressed = lz4_encode(data)
else:
raise NotImplementedError(f"Compression type {self._compression_type} is not supported")
size = self.size_in_bytes(
0, timestamp=0, key=None, value=compressed)
# We will try to reuse the same buffer if we have enough space
Expand Down

0 comments on commit 5e461a7

Please sign in to comment.