From 254d6456d7bcfb5b3d8e4e4bc4a9c17aa87091f3 Mon Sep 17 00:00:00 2001 From: dspeck1 Date: Tue, 17 Sep 2024 11:10:54 -0500 Subject: [PATCH] Add topic and initialization --- python/activator/activator.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/python/activator/activator.py b/python/activator/activator.py index a8198535..a3042398 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -78,8 +78,10 @@ main_pipelines = PipelinesConfig(os.environ["MAIN_PIPELINES_CONFIG"]) # Kafka cluster with next visit fanned out messages. next_visit_fan_out_kafka_cluster = os.environ["NEXT_VISIT_FAN_OUT_KAFKA_CLUSTER"] -# Kafka group for next visit messages. +# Kafka group for next visit fan out messages. next_visit_kakfa_group_id = os.environ["NEXT_VISIT_KAFKA_GROUP_ID"] +# Kafka topic for next visit fan out messages. +next_visit_fan_out_topic = os.environ["NEXT_VISIT_FAN_OUT_TOPIC"] _log = logging.getLogger("lsst." + __name__) _log.setLevel(logging.DEBUG) @@ -176,11 +178,23 @@ def create_app(): def keda_start(): + + setup_usdf_logger( + labels={"instrument": instrument_name}, + ) + + # Check initialization and abort early + _get_consumer() + _get_storage_client() + _get_central_butler() + _get_local_repo() + next_visit_fan_out_consumer = kafka.Consumer({ "bootstrap.servers": next_visit_fan_out_kafka_cluster, "group.id": next_visit_kakfa_group_id, "auto.offset.reset": "earliest", }) + next_visit_fan_out_consumer.subscribe(next_visit_fan_out_topic) next_visit_fan_out_message = next_visit_fan_out_consumer.consume(num_messages=1, timeout=5) _log.info(next_visit_fan_out_message)