Skip to content

Commit

Permalink
Add topic and initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 committed Sep 17, 2024
1 parent fdb142c commit 254d645
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@
main_pipelines = PipelinesConfig(os.environ["MAIN_PIPELINES_CONFIG"])
# Kafka cluster with next visit fanned out messages.
next_visit_fan_out_kafka_cluster = os.environ["NEXT_VISIT_FAN_OUT_KAFKA_CLUSTER"]
# Kafka group for next visit messages.
# Kafka group for next visit fan out messages.
next_visit_kakfa_group_id = os.environ["NEXT_VISIT_KAFKA_GROUP_ID"]
# Kafka topic for next visit fan out messages.
next_visit_fan_out_topic = os.environ["NEXT_VISIT_FAN_OUT_TOPIC"]

_log = logging.getLogger("lsst." + __name__)
_log.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -176,11 +178,23 @@ def create_app():


def keda_start():

setup_usdf_logger(
labels={"instrument": instrument_name},
)

# Check initialization and abort early
_get_consumer()
_get_storage_client()
_get_central_butler()
_get_local_repo()

next_visit_fan_out_consumer = kafka.Consumer({
"bootstrap.servers": next_visit_fan_out_kafka_cluster,
"group.id": next_visit_kakfa_group_id,
"auto.offset.reset": "earliest",
})
next_visit_fan_out_consumer.subscribe(next_visit_fan_out_topic)
next_visit_fan_out_message = next_visit_fan_out_consumer.consume(num_messages=1, timeout=5)
_log.info(next_visit_fan_out_message)

Expand Down

0 comments on commit 254d645

Please sign in to comment.