From 762aea54898e2b12271270a71efdc174b3556b51 Mon Sep 17 00:00:00 2001 From: maximwolpher Date: Wed, 23 Jan 2019 13:28:38 +0100 Subject: [PATCH] timestamp not always integer causing crash. Now checks common cases. --- winton_kafka_streams/processor/_record_collector.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/winton_kafka_streams/processor/_record_collector.py b/winton_kafka_streams/processor/_record_collector.py index cf76f71..7c2d358 100644 --- a/winton_kafka_streams/processor/_record_collector.py +++ b/winton_kafka_streams/processor/_record_collector.py @@ -35,6 +35,10 @@ def send(self, topic, key, value, timestamp, while not produced: try: + if isinstance(timestamp, tuple) and timestamp: + timestamp = timestamp[-1] + if isinstance(timestamp, float): + timestamp = int(timestamp) self.producer.produce(topic, ser_value, ser_key, partition, self.on_delivery, partitioner, timestamp) self.producer.poll(0) # Ensure previous message's delivery reports are served produced = True