From c62877df72f568b75ecab1d2eb71c0820093b12a Mon Sep 17 00:00:00 2001 From: Mirko Bonasorte Date: Wed, 5 Jun 2019 12:20:12 +0200 Subject: [PATCH] More verbosity on exceptions --- kafka/client.py | 2 +- kafka/client_async.py | 6 +++--- kafka/conn.py | 1 + kafka/consumer/base.py | 2 +- kafka/consumer/simple.py | 2 +- kafka/coordinator/base.py | 2 +- kafka/producer/base.py | 4 ++-- kafka/producer/kafka.py | 2 +- 8 files changed, 11 insertions(+), 10 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 148cae0d8..6389fff0f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -346,7 +346,7 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn): conn = self._get_conn(host, broker.port, afi) except KafkaConnectionError as e: log.warning('KafkaConnectionError attempting to send request %s ' - 'to server %s: %s', request_id, broker, e) + 'to server %s: %s', request_id, broker, e, exc_info=True) for payload in payloads: topic_partition = (payload.topic, payload.partition) diff --git a/kafka/client_async.py b/kafka/client_async.py index 96c0647b1..fdff87712 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -644,7 +644,7 @@ def _poll(self, timeout): if unexpected_data: # anything other than a 0-byte read means protocol issues log.warning('Protocol out of sync on %r, closing', conn) except socket.error: - pass + log.warning('Socket error', exc_info=True) conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests')) continue @@ -899,10 +899,10 @@ def wakeup(self): try: self._wake_w.sendall(b'x') except socket.timeout: - log.warning('Timeout to send to wakeup socket!') + log.warning('Timeout to send to wakeup socket!', exc_info=True) raise Errors.KafkaTimeoutError() except socket.error: - log.warning('Unable to send to wakeup socket!') + log.warning('Unable to send to wakeup socket!', exc_info=True) def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread diff --git a/kafka/conn.py b/kafka/conn.py index 825406c75..2f63c47fd 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -673,6 +673,7 @@ def _try_authenticate_gssapi(self, future): self.close(error=error) return future.failure(error) except Exception as e: + log.exception('Error executing _try_authenticate_gssapi', exc_info=True) self._lock.release() return future.failure(e) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index a77ce7ea0..76583389a 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -163,7 +163,7 @@ def commit(self, partitions=None): try: self.client.send_offset_commit_request(self.group, reqs) except KafkaError as e: - log.error('%s saving offsets: %s', e.__class__.__name__, e) + log.exception('%s saving offsets: %s', e.__class__.__name__, e) return False else: self.count_since_commit = 0 diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index a6a64a58f..cb3c9fe84 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -178,7 +178,7 @@ def reset_partition_offset(self, partition): try: (resp, ) = self.client.send_offset_request(reqs) except KafkaError as e: - log.error('%s sending offset request for %s:%d', + log.exception('%s sending offset request for %s:%d', e.__class__.__name__, self.topic, partition) else: self.offsets[partition] = resp.offsets[0] diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e538fda33..3e04f573c 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -932,7 +932,7 @@ def run(self): log.debug('Heartbeat thread closed due to coordinator gc') except RuntimeError as e: - log.error("Heartbeat thread for group %s failed due to unexpected error: %s", + log.exception("Heartbeat thread for group %s failed due to unexpected error: %s", self.coordinator.group_id, e) self.failed = e diff --git a/kafka/producer/base.py b/kafka/producer/base.py index b32396634..cac24be63 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -82,8 +82,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while not stop_event.is_set(): try: client.reinit() - except Exception as e: - log.warning('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms) + except Exception: + log.warning('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms, exc_info=True) time.sleep(float(retry_options.backoff_ms) / 1000) else: break diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e6bd3b9a6..847354f7b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -599,7 +599,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest # for API exceptions return them in the future, # for other exceptions raise directly except Errors.BrokerResponseError as e: - log.debug("Exception occurred during message send: %s", e) + log.debug("Exception occurred during message send: %s", e, exc_info=True) return FutureRecordMetadata( FutureProduceResult(TopicPartition(topic, partition)), -1, None, None,