Skip to content

Commit

Permalink
Support Describe log dirs
Browse files Browse the repository at this point in the history
I implemented API KEY 35 from the official Apache Kafka documentation. This functionality is requested in issue # 2163 and this is an implementation proposal.
  • Loading branch information
Courouge committed Nov 21, 2021
1 parent f0a57a6 commit cfa60f9
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
18 changes: 17 additions & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest
DeleteGroupsRequest, DescribeLogDirsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -1344,3 +1344,19 @@ def _wait_for_futures(self, futures):

if future.failed():
raise future.exception # pylint: disable-msg=raising-bad-type

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.
:return: A message future
"""
version = self._matching_api_version(DescribeLogDirsRequest)
if version <= 1:
request = DescribeLogDirsRequest[version]()
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
else:
raise NotImplementedError(
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return future.value
42 changes: 42 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,48 @@ class DescribeConfigsRequest_v2(Request):
]


class DescribeLogDirsResponse_v0(Response):
API_KEY = 35
API_VERSION = 0
FLEXIBLE_VERSION = True
SCHEMA = Schema(
('throttle_time_ms', Int32),
('log_dirs', Array(
('error_code', Int16),
('log_dir', String('utf-8')),
('topics', Array(
('name', String('utf-8')),
('partitions', Array(
('partition_index', Int32),
('partition_size', Int64),
('offset_lag', Int64),
('is_future_key', Boolean)
))
))
))
)


class DescribeLogDirsRequest_v0(Request):
API_KEY = 35
API_VERSION = 0
RESPONSE_TYPE = DescribeLogDirsResponse_v0
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Int32)
))
)


DescribeLogDirsResponse = [
DescribeLogDirsResponse_v0,
]
DescribeLogDirsRequest = [
DescribeLogDirsRequest_v0,
]


class SaslAuthenticateResponse_v0(Response):
API_KEY = 36
API_VERSION = 0
Expand Down

0 comments on commit cfa60f9

Please sign in to comment.