From 6fb2b08947b2c5feeb59c21f52fa2ce2fdf620f2 Mon Sep 17 00:00:00 2001 From: dspeck1 Date: Tue, 1 Oct 2024 14:57:43 -0500 Subject: [PATCH] Add json deserializer. --- python/activator/activator.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index 0aa428a2..2501eb00 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -177,6 +177,8 @@ def create_app(): # gunicorn assumes exit code 3 means "Worker failed to boot", though this is not documented sys.exit(3) +def deserializer(serialized): + return json.loads(serialized) def keda_start(): @@ -198,6 +200,7 @@ def keda_start(): "bootstrap.servers": next_visit_fan_out_kafka_cluster, "group.id": next_visit_kakfa_group_id, "auto.offset.reset": "earliest", + "value_deserializer": deserializer }) next_visit_fan_out_consumer.subscribe([next_visit_fan_out_topic]) next_visit_fan_out_message = next_visit_fan_out_consumer.consume(num_messages=1, timeout=5) @@ -208,7 +211,7 @@ def keda_start(): _log.warning("Consumer event: %s", msg.error()) else: _log.info(msg) - visit = FannedOutVisit(msg) + visit = FannedOutVisit(**msg) _log.info("Unpacked message as %r.", visit)