Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete records method support for kafka admin api #2062

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

10101010
Copy link

@10101010 10101010 commented Jun 1, 2020

Hello. Sometimes it might be useful to delete messages from kafka topic without a trick with setting 1 second retention time.

I tried to implement this like in java version of client
https://github.com/apache/kafka/blob/f98e176746d663fadedbcd3c18312a7f476a20c8/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L2279

For me it solves the problem, hope it can be helpful for somebody.

This change is Reviewable

@jeffwidman
Copy link
Contributor

jeffwidman commented Sep 17, 2020

Can you add a basic unit test of this? Should be fairly straightforward to use some of the existing pytest fixtures to spin up a broker with a new topic, then use a producer to write to it, then use this code to delete some of the messages and then verify only the deleted messages are gone.

responses.append(response)

return responses

# delete records protocol not yet implemented
# Note: send the request to the partition leaders
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this note should be deleted given that this PR implements it. 😄

@@ -881,4 +913,3 @@ class CreatePartitionsRequest_v1(Request):
CreatePartitionsResponse = [
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete the newline at the end?

@@ -948,6 +949,62 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
.format(version))
return self._send_request_to_controller(request)

def delete_records(self, records_to_delete, timeout_ms=None):
"""Delete records whose offset is smaller than the given offset of the corresponding partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include a note that errors must be checked as they are not raised (at least not currently).

@@ -948,6 +949,62 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
.format(version))
return self._send_request_to_controller(request)

def delete_records(self, records_to_delete, timeout_ms=None):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to insert a new parameter with the value of the offset whose is the parameter to estimate the smaller ones to delete.

def delete_records(self, records_to_delete, offset_to_delete, timeout_ms=None):


for topic2partition in records_to_delete:
request = DeleteRecordsRequest[version](
topics=[(topic2partition.topic, [(topic2partition.partition, records_to_delete[topic2partition])])],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here, insert the offset_to_delete parameter rather than records_to_delete[topic2partition].

topics=[(topic2partition.topic, [(topic2partition.partition, offset_to_delete)])]

wbarnha and others added 8 commits March 8, 2024 14:02
…terations for Kafka 0.8.2 and Python 3.12 (dpkp#159)

* skip failing tests for PyPy since they work locally

* Reconfigure tests for PyPy and 3.12

* Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2

* Update test_partitioner.py

* Update test_producer.py

* Timeout tests after ten minutes

* Set 0.8.2.2 to be experimental from hereon

* Formally support PyPy 3.9
* Test Kafka 0.8.2.2 using Python 3.11 in the meantime

* Override PYTHON_LATEST conditionally in python-package.yml

* Update python-package.yml

* add python annotation to kafka version test matrix

* Update python-package.yml

* try python 3.10
* Remove support for EOL'ed versions of Python

* Update setup.py
Too many MRs to review... so little time.
After stop/start kafka service, kafka-python may use 100% CPU caused by
busy-retry while the socket was closed. This fix the issue by unregister
the socket if the fd is negative.

Co-authored-by: Orange Kao <orange@aiven.io>
@dpkp dpkp force-pushed the master branch 2 times, most recently from 9c8c8af to 8ebb14c Compare February 12, 2025 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants