Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More verbosity on exceptions #163

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
conn = self._get_conn(host, broker.port, afi)
except KafkaConnectionError as e:
log.warning('KafkaConnectionError attempting to send request %s '
'to server %s: %s', request_id, broker, e)
'to server %s: %s', request_id, broker, e, exc_info=True)

for payload in payloads:
topic_partition = (payload.topic, payload.partition)
Expand Down
6 changes: 3 additions & 3 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def _poll(self, timeout):
if unexpected_data: # anything other than a 0-byte read means protocol issues
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
pass
log.warning('Socket error', exc_info=True)
conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue

Expand Down Expand Up @@ -899,10 +899,10 @@ def wakeup(self):
try:
self._wake_w.sendall(b'x')
except socket.timeout:
log.warning('Timeout to send to wakeup socket!')
log.warning('Timeout to send to wakeup socket!', exc_info=True)
raise Errors.KafkaTimeoutError()
except socket.error:
log.warning('Unable to send to wakeup socket!')
log.warning('Unable to send to wakeup socket!', exc_info=True)

def _clear_wake_fd(self):
# reading from wake socket should only happen in a single thread
Expand Down
1 change: 1 addition & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ def _try_authenticate_gssapi(self, future):
self.close(error=error)
return future.failure(error)
except Exception as e:
log.exception('Error executing _try_authenticate_gssapi', exc_info=True)
self._lock.release()
return future.failure(e)

Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def commit(self, partitions=None):
try:
self.client.send_offset_commit_request(self.group, reqs)
except KafkaError as e:
log.error('%s saving offsets: %s', e.__class__.__name__, e)
log.exception('%s saving offsets: %s', e.__class__.__name__, e)
return False
else:
self.count_since_commit = 0
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def reset_partition_offset(self, partition):
try:
(resp, ) = self.client.send_offset_request(reqs)
except KafkaError as e:
log.error('%s sending offset request for %s:%d',
log.exception('%s sending offset request for %s:%d',
e.__class__.__name__, self.topic, partition)
else:
self.offsets[partition] = resp.offsets[0]
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ def run(self):
log.debug('Heartbeat thread closed due to coordinator gc')

except RuntimeError as e:
log.error("Heartbeat thread for group %s failed due to unexpected error: %s",
log.exception("Heartbeat thread for group %s failed due to unexpected error: %s",
self.coordinator.group_id, e)
self.failed = e

Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while not stop_event.is_set():
try:
client.reinit()
except Exception as e:
log.warning('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
except Exception:
log.warning('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms, exc_info=True)
time.sleep(float(retry_options.backoff_ms) / 1000)
else:
break
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
# for API exceptions return them in the future,
# for other exceptions raise directly
except Errors.BrokerResponseError as e:
log.debug("Exception occurred during message send: %s", e)
log.debug("Exception occurred during message send: %s", e, exc_info=True)
return FutureRecordMetadata(
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
Expand Down