Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete records method support for kafka admin api #2062

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
59 changes: 58 additions & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, \
DeleteRecordsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
Expand Down Expand Up @@ -948,6 +949,62 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
.format(version))
return self._send_request_to_controller(request)

def delete_records(self, records_to_delete, timeout_ms=None):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to insert a new parameter with the value of the offset whose is the parameter to estimate the smaller ones to delete.

def delete_records(self, records_to_delete, offset_to_delete, timeout_ms=None):

"""Delete records whose offset is smaller than the given offset of the corresponding partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include a note that errors must be checked as they are not raised (at least not currently).

:param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the
given partitions.

:return: List of DeleteRecordsResponse
"""
timeout_ms = self._validate_timeout(timeout_ms)
version = self._matching_api_version(MetadataRequest)

topics = set()

for topic2partition in records_to_delete:
topics.add(topic2partition.topic)

request = MetadataRequest[version](
topics=list(topics),
allow_auto_topic_creation=False
)

future = self._send_request_to_node(self._client.least_loaded_node(), request)

self._wait_for_futures([future])
response = future.value

version = self._matching_api_version(DeleteRecordsRequest)

PARTITIONS_INFO = 3
NAME = 1
PARTITION_INDEX = 1
LEADER = 2

partition2leader = dict()

for topic in response.topics:
for partition in topic[PARTITIONS_INFO]:
t2p = TopicPartition(topic=topic[NAME], partition=partition[PARTITION_INDEX])
partition2leader[t2p] = partition[LEADER]

responses = []

for topic2partition in records_to_delete:
request = DeleteRecordsRequest[version](
topics=[(topic2partition.topic, [(topic2partition.partition, records_to_delete[topic2partition])])],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here, insert the offset_to_delete parameter rather than records_to_delete[topic2partition].

topics=[(topic2partition.topic, [(topic2partition.partition, offset_to_delete)])]

timeout_ms=timeout_ms
)
# Sending separate request for each partition leader
future = self._send_request_to_node(partition2leader[topic2partition], request)
self._wait_for_futures([future])

response = future.value
responses.append(response)

return responses

# delete records protocol not yet implemented
# Note: send the request to the partition leaders
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this note should be deleted given that this PR implements it. 😄


Expand Down
33 changes: 32 additions & 1 deletion kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,38 @@ class DeleteTopicsRequest_v3(Request):
]


class DeleteRecordsResponse_v0(Response):
API_KEY = 21
API_VERSION = 0
SCHEMA = Schema(
('topics', Array(
('name', String('utf-8')),
('partitions', Array(
('partition_index', Int32),
('low_watermark', Int64),
('error_code', Int16))))),
('throttle_time_ms', Int32)
)


class DeleteRecordsRequest_v0(Request):
API_KEY = 21
API_VERSION = 0
RESPONSE_TYPE = DeleteRecordsResponse_v0
SCHEMA = Schema(
('topics', Array(
('name', String('utf-8')),
('partitions', Array(
('partition_index', Int32),
('offset', Int64))))),
('timeout_ms', Int32)
)


DeleteRecordsResponse = [DeleteRecordsResponse_v0]
DeleteRecordsRequest = [DeleteRecordsRequest_v0]


class ListGroupsResponse_v0(Response):
API_KEY = 16
API_VERSION = 0
Expand Down Expand Up @@ -881,4 +913,3 @@ class CreatePartitionsRequest_v1(Request):
CreatePartitionsResponse = [
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete the newline at the end?