From 8be6838656fb420b732096e5f09d1489b0889983 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 8 Nov 2024 22:49:39 -0600 Subject: [PATCH 1/6] messaging: Make exchange/queue name prefix configurable This, adds a new messaging.prefix option that gets modified to include the test slot as needed. Exchanges are defined as module level vars initialized on import. But, oslo_config is not setup yet at import time, so delay applying the new prefix setting until just before they get created in RabbitMQ. Luckily, Exchange() objects are lightweight objects that just hold the exchange name and type. They could do more, but we don't bind them to a channel. Because the exchange objects merely hold strings, and because they are basically singletons (module-level vars), we can safely update the exchange name just before declaring it in RabbitMQ. From that point on, everything that uses a singleton exchange object will get the updated name. --- conf/st2.conf.sample | 2 ++ .../test_actions_queue_consumer.py | 5 ++- .../micro/test_publisher_compression.py | 2 +- st2common/st2common/config.py | 5 +++ st2common/st2common/transport/actionalias.py | 3 +- .../transport/actionexecutionstate.py | 3 +- st2common/st2common/transport/announcement.py | 3 +- .../st2common/transport/bootstrap_utils.py | 3 +- st2common/st2common/transport/execution.py | 3 +- st2common/st2common/transport/kombu.py | 36 +++++++++++++++++++ st2common/st2common/transport/liveaction.py | 3 +- st2common/st2common/transport/queues.py | 3 +- st2common/st2common/transport/reactor.py | 2 +- st2common/st2common/transport/workflow.py | 15 +++----- st2common/st2common/util/queues.py | 4 +-- st2common/tests/unit/test_queue_consumer.py | 1 + st2common/tests/unit/test_state_publisher.py | 1 + st2common/tests/unit/test_transport.py | 5 ++- st2tests/st2tests/config.py | 18 +++++++++- 19 files changed, 85 insertions(+), 32 deletions(-) create mode 100644 st2common/st2common/transport/kombu.py diff --git a/conf/st2.conf.sample b/conf/st2.conf.sample index 5eed1ffd4c..3c9d005d0a 100644 --- a/conf/st2.conf.sample +++ b/conf/st2.conf.sample @@ -230,6 +230,8 @@ connection_retries = 10 connection_retry_wait = 10000 # Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.). login_method = None +# Prefix for all exchange and queue names. +prefix = st2 # Use SSL / TLS to connect to the messaging server. Same as appending "?ssl=true" at the end of the connection URL string. ssl = False # ca_certs file contains a set of concatenated CA certificates, which are used to validate certificates passed from RabbitMQ. diff --git a/st2actions/tests/integration/test_actions_queue_consumer.py b/st2actions/tests/integration/test_actions_queue_consumer.py index 149e94b0f3..aa37b5e589 100644 --- a/st2actions/tests/integration/test_actions_queue_consumer.py +++ b/st2actions/tests/integration/test_actions_queue_consumer.py @@ -17,11 +17,10 @@ import random import eventlet -from kombu import Exchange -from kombu import Queue from unittest import TestCase from st2common.transport.consumers import ActionsQueueConsumer +from st2common.transport.kombu import Exchange, Queue from st2common.transport.publishers import PoolPublisher from st2common.transport import utils as transport_utils from st2common.models.db.liveaction import LiveActionDB @@ -35,7 +34,7 @@ class ActionsQueueConsumerTestCase(TestCase): def test_stop_consumption_on_shutdown(self): exchange = Exchange("st2.execution.test", type="topic") - queue_name = "test-" + str(random.randint(1, 10000)) + queue_name = f"st2.test-{random.randint(1, 10000)}" queue = Queue( name=queue_name, exchange=exchange, routing_key="#", auto_delete=True ) diff --git a/st2common/benchmarks/micro/test_publisher_compression.py b/st2common/benchmarks/micro/test_publisher_compression.py index 51d13fd8a4..0faabf86a1 100644 --- a/st2common/benchmarks/micro/test_publisher_compression.py +++ b/st2common/benchmarks/micro/test_publisher_compression.py @@ -16,7 +16,6 @@ monkey_patch() -from kombu import Exchange from kombu.serialization import pickle import os @@ -27,6 +26,7 @@ from st2common.models.db.liveaction import LiveActionDB from st2common.transport import publishers +from st2common.transport.kombu import Exchange from common import FIXTURES_DIR from common import PYTEST_FIXTURE_FILE_PARAM_DECORATOR diff --git a/st2common/st2common/config.py b/st2common/st2common/config.py index 15a4d5b32a..a9461951d1 100644 --- a/st2common/st2common/config.py +++ b/st2common/st2common/config.py @@ -404,6 +404,11 @@ def register_opts(ignore_errors=False): help="Compression algorithm to use for compressing the payloads which are sent over " "the message bus. Defaults to no compression.", ), + cfg.StrOpt( + "prefix", + default="st2", + help="Prefix for all exchange and queue names.", + ), ] do_register_opts(messaging_opts, "messaging", ignore_errors) diff --git a/st2common/st2common/transport/actionalias.py b/st2common/st2common/transport/actionalias.py index 33fff1e92d..a083b8588b 100644 --- a/st2common/st2common/transport/actionalias.py +++ b/st2common/st2common/transport/actionalias.py @@ -15,8 +15,9 @@ # All Exchanges and Queues related to liveaction. from __future__ import absolute_import -from kombu import Exchange, Queue + from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = [ "ActionAliasPublisher", diff --git a/st2common/st2common/transport/actionexecutionstate.py b/st2common/st2common/transport/actionexecutionstate.py index 46fe095fbf..581fe0a5a5 100644 --- a/st2common/st2common/transport/actionexecutionstate.py +++ b/st2common/st2common/transport/actionexecutionstate.py @@ -17,9 +17,8 @@ from __future__ import absolute_import -from kombu import Exchange, Queue - from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["ActionExecutionStatePublisher"] diff --git a/st2common/st2common/transport/announcement.py b/st2common/st2common/transport/announcement.py index ff1c005a78..76f1ce8b34 100644 --- a/st2common/st2common/transport/announcement.py +++ b/st2common/st2common/transport/announcement.py @@ -15,12 +15,11 @@ from __future__ import absolute_import -from kombu import Exchange, Queue - from st2common import log as logging from st2common.constants.trace import TRACE_CONTEXT from st2common.models.api.trace import TraceContext from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["AnnouncementPublisher", "AnnouncementDispatcher", "get_queue"] diff --git a/st2common/st2common/transport/bootstrap_utils.py b/st2common/st2common/transport/bootstrap_utils.py index b2eb8c7c2d..b5a22b7eda 100644 --- a/st2common/st2common/transport/bootstrap_utils.py +++ b/st2common/st2common/transport/bootstrap_utils.py @@ -31,7 +31,7 @@ from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG from st2common.transport.announcement import ANNOUNCEMENT_XCHG from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper -from st2common.transport.execution import EXECUTION_XCHG +from st2common.transport.execution import EXECUTION_XCHG, EXECUTION_OUTPUT_XCHG from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG from st2common.transport.reactor import SENSOR_CUD_XCHG from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG @@ -67,6 +67,7 @@ ACTIONEXECUTIONSTATE_XCHG, ANNOUNCEMENT_XCHG, EXECUTION_XCHG, + EXECUTION_OUTPUT_XCHG, LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG, TRIGGER_CUD_XCHG, diff --git a/st2common/st2common/transport/execution.py b/st2common/st2common/transport/execution.py index 5d2880fd6f..3ffeafe345 100644 --- a/st2common/st2common/transport/execution.py +++ b/st2common/st2common/transport/execution.py @@ -16,8 +16,9 @@ # All Exchanges and Queues related to liveaction. from __future__ import absolute_import -from kombu import Exchange, Queue + from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = [ "ActionExecutionPublisher", diff --git a/st2common/st2common/transport/kombu.py b/st2common/st2common/transport/kombu.py new file mode 100644 index 0000000000..5074399b0b --- /dev/null +++ b/st2common/st2common/transport/kombu.py @@ -0,0 +1,36 @@ +# Copyright 2024 The StackStorm Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import kombu +from oslo_config import cfg + + +class Exchange(kombu.Exchange): + def __call__(self, *args, **kwargs): + # update exchange name with prefix just before binding (as late as possible). + prefix = cfg.CONF.messaging.prefix + if self.name and prefix != "st2": + self.name = self.name.replace("st2.", f"{prefix}.", 1) + return super().__call__(*args, **kwargs) + + +class Queue(kombu.Queue): + def __call__(self, *args, **kwargs): + # update queue name with prefix just before binding (as late as possible). + prefix = cfg.CONF.messaging.prefix + if self.name and prefix != "st2": + self.name = self.name.replace("st2.", f"{prefix}.", 1) + return super().__call__(*args, **kwargs) diff --git a/st2common/st2common/transport/liveaction.py b/st2common/st2common/transport/liveaction.py index 670c5ebb2e..e410c5d933 100644 --- a/st2common/st2common/transport/liveaction.py +++ b/st2common/st2common/transport/liveaction.py @@ -17,9 +17,8 @@ from __future__ import absolute_import -from kombu import Exchange, Queue - from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["LiveActionPublisher", "get_queue", "get_status_management_queue"] diff --git a/st2common/st2common/transport/queues.py b/st2common/st2common/transport/queues.py index 33a3f26d03..eefe72b964 100644 --- a/st2common/st2common/transport/queues.py +++ b/st2common/st2common/transport/queues.py @@ -22,8 +22,6 @@ from __future__ import absolute_import -from kombu import Queue - from st2common.constants import action as action_constants from st2common.transport import actionalias from st2common.transport import actionexecutionstate @@ -33,6 +31,7 @@ from st2common.transport import publishers from st2common.transport import reactor from st2common.transport import workflow +from st2common.transport.kombu import Queue __all__ = [ "ACTIONSCHEDULER_REQUEST_QUEUE", diff --git a/st2common/st2common/transport/reactor.py b/st2common/st2common/transport/reactor.py index 3dba86344f..657a35c6b3 100644 --- a/st2common/st2common/transport/reactor.py +++ b/st2common/st2common/transport/reactor.py @@ -14,12 +14,12 @@ # limitations under the License. from __future__ import absolute_import -from kombu import Exchange, Queue from st2common import log as logging from st2common.constants.trace import TRACE_CONTEXT from st2common.models.api.trace import TraceContext from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = [ "TriggerCUDPublisher", diff --git a/st2common/st2common/transport/workflow.py b/st2common/st2common/transport/workflow.py index 0302611a36..f4cdc3fbba 100644 --- a/st2common/st2common/transport/workflow.py +++ b/st2common/st2common/transport/workflow.py @@ -17,16 +17,13 @@ from __future__ import absolute_import -import kombu - from st2common.transport import publishers +from st2common.transport.kombu import Exchange, Queue __all__ = ["WorkflowExecutionPublisher", "get_queue", "get_status_management_queue"] -WORKFLOW_EXECUTION_XCHG = kombu.Exchange("st2.workflow", type="topic") -WORKFLOW_EXECUTION_STATUS_MGMT_XCHG = kombu.Exchange( - "st2.workflow.status", type="topic" -) +WORKFLOW_EXECUTION_XCHG = Exchange("st2.workflow", type="topic") +WORKFLOW_EXECUTION_STATUS_MGMT_XCHG = Exchange("st2.workflow.status", type="topic") class WorkflowExecutionPublisher( @@ -40,10 +37,8 @@ def __init__(self): def get_queue(name, routing_key): - return kombu.Queue(name, WORKFLOW_EXECUTION_XCHG, routing_key=routing_key) + return Queue(name, WORKFLOW_EXECUTION_XCHG, routing_key=routing_key) def get_status_management_queue(name, routing_key): - return kombu.Queue( - name, WORKFLOW_EXECUTION_STATUS_MGMT_XCHG, routing_key=routing_key - ) + return Queue(name, WORKFLOW_EXECUTION_STATUS_MGMT_XCHG, routing_key=routing_key) diff --git a/st2common/st2common/util/queues.py b/st2common/st2common/util/queues.py index 9fce3b20a7..b8c7372fa3 100644 --- a/st2common/st2common/util/queues.py +++ b/st2common/st2common/util/queues.py @@ -47,7 +47,7 @@ def get_queue_name(queue_name_base, queue_name_suffix, add_random_uuid_to_suffix # might cause issues in RabbitMQ. u_hex = uuid.uuid4().hex uuid_suffix = uuid.uuid4().hex[len(u_hex) - 10 :] - queue_suffix = "%s-%s" % (queue_name_suffix, uuid_suffix) + queue_suffix = f"{queue_name_suffix}-{uuid_suffix}" - queue_name = "%s.%s" % (queue_name_base, queue_suffix) + queue_name = f"{queue_name_base}.{queue_suffix}" return queue_name diff --git a/st2common/tests/unit/test_queue_consumer.py b/st2common/tests/unit/test_queue_consumer.py index 463eb0def4..4461444255 100644 --- a/st2common/tests/unit/test_queue_consumer.py +++ b/st2common/tests/unit/test_queue_consumer.py @@ -23,6 +23,7 @@ from tests.unit.base import FakeModelDB +# AMQP connection is mocked, so these do not need messaging.prefix FAKE_XCHG = Exchange("st2.tests", type="topic") FAKE_WORK_Q = Queue("st2.tests.unit", FAKE_XCHG) diff --git a/st2common/tests/unit/test_state_publisher.py b/st2common/tests/unit/test_state_publisher.py index 86bb67c2cf..7961cb7dc3 100644 --- a/st2common/tests/unit/test_state_publisher.py +++ b/st2common/tests/unit/test_state_publisher.py @@ -31,6 +31,7 @@ from st2tests import DbTestCase +# PoolPublisher is mocked, so this does not need messaging.prefix FAKE_STATE_MGMT_XCHG = kombu.Exchange("st2.fake.state", type="topic") diff --git a/st2common/tests/unit/test_transport.py b/st2common/tests/unit/test_transport.py index ae12b1ea9d..71d18e6010 100644 --- a/st2common/tests/unit/test_transport.py +++ b/st2common/tests/unit/test_transport.py @@ -25,13 +25,12 @@ from bson.objectid import ObjectId from kombu.mixins import ConsumerMixin -from kombu import Exchange -from kombu import Queue from oslo_config import cfg from st2common.transport.publishers import PoolPublisher from st2common.transport.utils import _get_ssl_kwargs from st2common.transport import utils as transport_utils +from st2common.transport.kombu import Exchange, Queue from st2common.models.db.liveaction import LiveActionDB __all__ = ["TransportUtilsTestCase"] @@ -69,7 +68,7 @@ def test_publish_compression(self): live_action_db.result = {"foo": "bar"} exchange = Exchange("st2.execution.test", type="topic") - queue_name = "test-" + str(random.randint(1, 10000)) + queue_name = f"st2.test-{random.randint(1, 10000)}" queue = Queue( name=queue_name, exchange=exchange, routing_key="#", auto_delete=True ) diff --git a/st2tests/st2tests/config.py b/st2tests/st2tests/config.py index 6a2e13ea89..e95e5f0799 100644 --- a/st2tests/st2tests/config.py +++ b/st2tests/st2tests/config.py @@ -66,6 +66,7 @@ def _setup_config_opts(coordinator_noop=True): def _override_config_opts(coordinator_noop=False): _override_db_opts() + _override_mq_opts() _override_common_opts() _override_api_opts() _override_keyvalue_opts() @@ -107,8 +108,18 @@ def db_opts_as_env_vars() -> Dict[str, str]: return env +def _override_mq_opts(): + mq_prefix = CONF.messaging.prefix + mq_prefix = "st2test" if mq_prefix == "st2" else mq_prefix + mq_prefix = mq_prefix + os.environ.get("ST2TESTS_PARALLEL_SLOT", "") + CONF.set_override(name="prefix", override=mq_prefix, group="messaging") + + def mq_opts_as_env_vars() -> Dict[str, str]: - return {"ST2_MESSAGING__URL": CONF.messaging.url} + return { + "ST2_MESSAGING__URL": CONF.messaging.url, + "ST2_MESSAGING__PREFIX": CONF.messaging.prefix, + } def _override_common_opts(): @@ -269,6 +280,11 @@ def _register_api_opts(): default=None, help="Login method to use (AMQPLAIN, PLAIN, EXTERNAL, etc.).", ), + cfg.StrOpt( + "prefix", + default="st2", + help="Prefix for all exchange and queue names.", + ), ] _register_opts(messaging_opts, group="messaging") From 921416e436d571b235f867379862682eb0b1fec8 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Sat, 9 Nov 2024 19:37:05 -0600 Subject: [PATCH 2/6] stream: use well known st2. prefix in event names Now that the exchange name prefix is configurable, we have to undo the prefix to maintain a backwards compatible API. Also, having the event names vary based on config would be a very bad UX, so we don't want to go there anyway. --- st2common/st2common/stream/listener.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/st2common/st2common/stream/listener.py b/st2common/st2common/stream/listener.py index 759133f96f..5eb5dc8b8f 100644 --- a/st2common/st2common/stream/listener.py +++ b/st2common/st2common/stream/listener.py @@ -58,9 +58,15 @@ def get_consumers(self, consumer, channel): raise NotImplementedError("get_consumers() is not implemented") def processor(self, model=None): + exchange_prefix = cfg.CONF.messaging.prefix + def process(body, message): meta = message.delivery_info - event_name = "%s__%s" % (meta.get("exchange"), meta.get("routing_key")) + event_prefix = meta.get("exchange", "") + if exchange_prefix != "st2" and event_prefix.startswith(exchange_prefix): + # use well-known event names over configurable exchange names + event_prefix = event_prefix.replace(f"{exchange_prefix}.", "st2.", 1) + event_name = f"{event_prefix}__{meta.get('routing_key')}" try: if model: From c7c4263d12b6525befd7ed456b4e87eebe208d9a Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 8 Nov 2024 23:49:54 -0600 Subject: [PATCH 3/6] launchdev.sh: Update default messaging.prefix to st2dev --- conf/st2.dev.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/conf/st2.dev.conf b/conf/st2.dev.conf index cf2b5b6596..6755403cf6 100644 --- a/conf/st2.dev.conf +++ b/conf/st2.dev.conf @@ -103,6 +103,7 @@ ssh_key_file = /home/vagrant/.ssh/stanley_rsa [messaging] url = amqp://guest:guest@127.0.0.1:5672/ +prefix = st2dev # Uncomment to test SSL options #url = amqp://guest:guest@127.0.0.1:5671/ #ssl = True From ac9bcbd3400ab51dbfe6656fa019051c89379fe2 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 11 Nov 2024 22:31:35 -0600 Subject: [PATCH 4/6] Use Producer.auto_declare to declare exchanges as needed Connection has a cache of which entities (exchange/queue) have been declared, so this shouldn't have too much of a performance impact. This does, however, make tests much more reliable. --- st2common/st2common/transport/publishers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/st2common/st2common/transport/publishers.py b/st2common/st2common/transport/publishers.py index 73596ed70e..62484d4c46 100644 --- a/st2common/st2common/transport/publishers.py +++ b/st2common/st2common/transport/publishers.py @@ -71,10 +71,11 @@ def do_publish(connection, channel): # completely invalidating this ConnectionPool. Also, a ConnectionPool for # producer does not really solve any problems for us so better to create a # Producer for each publish. - producer = Producer(channel) + # passing exchange to Producer __init__ allows auto_declare to declare + # anything that's missing (especially useful for tests). + producer = Producer(channel, exchange=exchange) kwargs = { "body": payload, - "exchange": exchange, "routing_key": routing_key, "serializer": "pickle", "compression": compression, From 3ca0b037801dfaced826c2e09060638ee026a529 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Mon, 11 Nov 2024 14:49:22 -0600 Subject: [PATCH 5/6] tests: respect ST2_MESSAGING_* env vars --- pants-plugins/uses_services/rabbitmq_rules.py | 32 +++++++++++++++---- .../uses_services/rabbitmq_rules_test.py | 11 +++++-- pants.toml | 3 ++ 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/pants-plugins/uses_services/rabbitmq_rules.py b/pants-plugins/uses_services/rabbitmq_rules.py index 3d5fd13024..89df3976f9 100644 --- a/pants-plugins/uses_services/rabbitmq_rules.py +++ b/pants-plugins/uses_services/rabbitmq_rules.py @@ -15,6 +15,7 @@ from dataclasses import dataclass from textwrap import dedent +from typing import Tuple from pants.backend.python.goals.pytest_runner import ( PytestPluginSetupRequest, @@ -27,6 +28,8 @@ VenvPexProcess, rules as pex_rules, ) +from pants.core.goals.test import TestExtraEnv +from pants.engine.env_vars import EnvironmentVars from pants.engine.fs import CreateDigest, Digest, FileContent from pants.engine.rules import collect_rules, Get, MultiGet, rule from pants.engine.process import FallibleProcessResult, ProcessCacheScope @@ -54,13 +57,17 @@ class UsesRabbitMQRequest: # These config opts for integration tests are in: # conf/st2.tests*.conf st2tests/st2tests/fixtures/conf/st2.tests*.conf # (changed by setting ST2_CONFIG_PATH env var inside the tests) - # TODO: for unit tests: modify code to pull mq connect settings from env vars - # TODO: for int tests: modify st2.tests*.conf on the fly to set the per-pantsd-slot vhost - # and either add env vars for mq connect settings or modify conf files as well + # These can also be updated via the ST2_MESSAGING_* env vars (which oslo_config reads). + # Integration tests should pass these changes onto subprocesses via the same env vars. - # with our version of oslo.config (newer are slower) we can't directly override opts w/ environment variables. + mq_urls: Tuple[str] = ("amqp://guest:guest@127.0.0.1:5672//",) - mq_urls: tuple[str] = ("amqp://guest:guest@127.0.0.1:5672//",) + @classmethod + def from_env(cls, env: EnvironmentVars) -> UsesRabbitMQRequest: + default = cls() + url = env.get("ST2_MESSAGING__URL", None) + mq_urls = (url,) if url else default.mq_urls + return UsesRabbitMQRequest(mq_urls=mq_urls) @dataclass(frozen=True) @@ -83,9 +90,12 @@ def is_applicable(cls, target: Target) -> bool: ) async def rabbitmq_is_running_for_pytest( request: PytestUsesRabbitMQRequest, + test_extra_env: TestExtraEnv, ) -> PytestPluginSetup: # this will raise an error if rabbitmq is not running - _ = await Get(RabbitMQIsRunning, UsesRabbitMQRequest()) + _ = await Get( + RabbitMQIsRunning, UsesRabbitMQRequest.from_env(env=test_extra_env.env) + ) return PytestPluginSetup() @@ -167,6 +177,16 @@ async def rabbitmq_is_running( """ ), service_start_cmd_generic="systemctl start rabbitmq-server", + env_vars_hint=dedent( + """\ + You can also export the ST2_MESSAGING__URL env var to automatically use any + RabbitMQ host, local or remote, while running unit and integration tests. + If needed, you can also override the default exchange/queue name prefix + by exporting ST2_MESSAGING__PREFIX. Note that tests always add a numeric + suffix to the exchange/queue name prefix so that tests can safely run + in parallel. + """ + ), ), ) diff --git a/pants-plugins/uses_services/rabbitmq_rules_test.py b/pants-plugins/uses_services/rabbitmq_rules_test.py index 9eb4ae5055..c1945bda43 100644 --- a/pants-plugins/uses_services/rabbitmq_rules_test.py +++ b/pants-plugins/uses_services/rabbitmq_rules_test.py @@ -51,7 +51,14 @@ def run_rabbitmq_is_running( "--backend-packages=uses_services", *(extra_args or ()), ], - env_inherit={"PATH", "PYENV_ROOT", "HOME"}, + env_inherit={ + "PATH", + "PYENV_ROOT", + "HOME", + "ST2_MESSAGING__URL", + "ST2_MESSAGING__PREFIX", + "ST2TESTS_PARALLEL_SLOT", + }, ) result = rule_runner.request( RabbitMQIsRunning, @@ -62,7 +69,7 @@ def run_rabbitmq_is_running( # Warning this requires that rabbitmq be running def test_rabbitmq_is_running(rule_runner: RuleRunner) -> None: - request = UsesRabbitMQRequest() + request = UsesRabbitMQRequest.from_env(env=rule_runner.environment) mock_platform = platform(os="TestMock") # we are asserting that this does not raise an exception diff --git a/pants.toml b/pants.toml index 23a2c5f4a2..8d5568daab 100644 --- a/pants.toml +++ b/pants.toml @@ -247,6 +247,9 @@ extra_env_vars = [ "ST2_DATABASE__CONNECTION_TIMEOUT", "ST2_DATABASE__USERNAME", "ST2_DATABASE__PASSWORD", + # Use these to override RabbitMQ connection details + "ST2_MESSAGING__URL", + "ST2_MESSAGING__PREFIX", # Tests will modify this to be "{prefix}{ST2TESTS_PARALLEL_SLOT}" # Use these to override the redis host and port "ST2TESTS_REDIS_HOST", "ST2TESTS_REDIS_PORT", From b6d23c9c20d2d91ce1c3f4bc9a783a81dc49633f Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Sat, 23 Nov 2024 11:01:53 -0600 Subject: [PATCH 6/6] add changelog entry --- CHANGELOG.rst | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a94babb48b..39122fb1c1 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -69,7 +69,7 @@ Added working on StackStorm, improve our security posture, and improve CI reliability thanks in part to pants' use of PEX lockfiles. This is not a user-facing addition. #6118 #6141 #6133 #6120 #6181 #6183 #6200 #6237 #6229 #6240 #6241 #6244 #6251 #6253 - #6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278 + #6254 #6258 #6259 #6260 #6269 #6275 #6279 #6278 #6282 Contributed by @cognifloyd * Build of ST2 EL9 packages #6153 Contributed by @amanda11 @@ -93,6 +93,12 @@ Added If you experience any issues when using this experimental feature, please file an issue. #6277 Contributed by @cognifloyd +* Add new option `[messaging].prefix` to configure the prefix used in RabbitMQ exchanges and queues. + The default is `st2` (resulting in exchange names like `st2.execution` and `st2.sensor`). + This is primarily designed to support safely running tests in parallel where creating a vhost for + each parallel test run would be a maintenance burden. #6282 + Contributed by @cognifloyd + 3.8.1 - December 13, 2023 ------------------------- Fixed