Skip to content

Commit

Permalink
feat: logged cancelled exception as info messages
Browse files Browse the repository at this point in the history
we manual interrupt our connection so all inflight request which was canceled by this reason shouldn't log it as an error
  • Loading branch information
Arfey committed Nov 25, 2020
1 parent 53dc740 commit 2f93470
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
15 changes: 12 additions & 3 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ def send_fetches(self):
if self._client.ready(node_id):
log.debug("Sending FetchRequest to node %s", node_id)
future = self._client.send(node_id, request, wakeup=False)
future.add_callback(self._handle_fetch_response, request, time.time())
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
future.add_callback(self._handle_fetch_success, request, time.time())
future.add_errback(self._handle_fetch_error, node_id)
futures.append(future)
self._fetch_futures.extend(futures)
self._clean_done_fetch_futures()
Expand Down Expand Up @@ -747,7 +747,7 @@ def _create_fetch_requests(self):
partition_data)
return requests

def _handle_fetch_response(self, request, send_time, response):
def _handle_fetch_success(self, request, send_time, response):
"""The callback for fetch completion"""
fetch_offsets = {}
for topic, partitions in request.topics:
Expand Down Expand Up @@ -778,6 +778,15 @@ def _handle_fetch_response(self, request, send_time, response):
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)

def _handle_fetch_error(self, node_id, err):
"""The callback for fetch error"""
msg = 'Fetch to node %s failed: %s' % (node_id, err)

if isinstance(err, Errors.Cancelled):
log.info(msg)
else:
log.error(msg)

def _parse_fetched_data(self, completed_fetch):
tp = completed_fetch.topic_partition
fetch_offset = completed_fetch.fetched_offset
Expand Down
4 changes: 2 additions & 2 deletions test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ def test_fetched_records(fetcher, topic, mocker):
1,
),
])
def test__handle_fetch_response(fetcher, fetch_request, fetch_response, num_partitions):
fetcher._handle_fetch_response(fetch_request, time.time(), fetch_response)
def test__handle_fetch_success(fetcher, fetch_request, fetch_response, num_partitions):
fetcher._handle_fetch_success(fetch_request, time.time(), fetch_response)
assert len(fetcher._completed_fetches) == num_partitions


Expand Down

0 comments on commit 2f93470

Please sign in to comment.