diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 9c68cbd47..a4bacfb04 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -272,18 +272,36 @@ async def scale_application( ) -def is_relation_joined(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) -> bool: +def is_relation_joined( + ops_test: OpsTest, + endpoint_one: str, + endpoint_two: str, + application_one: Optional[str] = None, + application_two: Optional[str] = None, +) -> bool: """Check if a relation is joined. Args: ops_test: The ops test object passed into every test case endpoint_one: The first endpoint of the relation endpoint_two: The second endpoint of the relation + application_one: The name of the first application + application_two: The name of the second application """ for rel in ops_test.model.relations: - endpoints = [endpoint.name for endpoint in rel.endpoints] - if endpoint_one in endpoints and endpoint_two in endpoints: - return True + if application_one and application_two: + endpoints = [ + f"{endpoint.application_name}:{endpoint.name}" for endpoint in rel.endpoints + ] + if ( + f"{application_one}:{endpoint_one}" in endpoints + and f"{application_two}:{endpoint_two}" in endpoints + ): + return True + else: + endpoints = [endpoint.name for endpoint in rel.endpoints] + if endpoint_one in endpoints and endpoint_two in endpoints: + return True return False diff --git a/tests/integration/high_availability/high_availability_helpers.py b/tests/integration/high_availability/high_availability_helpers.py index 1e6d9e108..3b0172d13 100644 --- a/tests/integration/high_availability/high_availability_helpers.py +++ b/tests/integration/high_availability/high_availability_helpers.py @@ -17,7 +17,7 @@ from lightkube import Client from lightkube.models.meta_v1 import ObjectMeta from lightkube.resources.apps_v1 import StatefulSet -from lightkube.resources.core_v1 import PersistentVolume, PersistentVolumeClaim, Pod +from lightkube.resources.core_v1 import Endpoints, PersistentVolume, PersistentVolumeClaim, Pod from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, retry, stop_after_attempt, stop_after_delay, wait_fixed @@ -122,6 +122,7 @@ async def deploy_and_scale_mysql( mysql_application_name: str = MYSQL_DEFAULT_APP_NAME, num_units: int = 3, model: Optional[Model] = None, + cluster_name: str = CLUSTER_NAME, ) -> str: """Deploys and scales the mysql application charm. @@ -132,6 +133,7 @@ async def deploy_and_scale_mysql( mysql_application_name: The name of the mysql application if it is to be deployed num_units: The number of units to deploy model: The model to deploy the mysql application to + cluster_name: The name of the mysql cluster """ application_name = get_application_name(ops_test, "mysql") if not model: @@ -150,7 +152,7 @@ async def deploy_and_scale_mysql( # Cache the built charm to avoid rebuilding it between tests mysql_charm = charm - config = {"cluster-name": CLUSTER_NAME, "profile": "testing"} + config = {"cluster-name": cluster_name, "profile": "testing"} resources = {"mysql-image": METADATA["resources"]["mysql-image"]["upstream-source"]} async with ops_test.fast_forward("60s"): @@ -177,15 +179,21 @@ async def deploy_and_scale_mysql( return mysql_application_name -async def deploy_and_scale_application(ops_test: OpsTest) -> str: +async def deploy_and_scale_application( + ops_test: OpsTest, + check_for_existing_application: bool = True, + test_application_name: str = APPLICATION_DEFAULT_APP_NAME, +) -> str: """Deploys and scales the test application charm. Args: ops_test: The ops test framework + check_for_existing_application: Whether to check for existing test applications + test_application_name: Name of test application to be deployed """ - application_name = get_application_name(ops_test, APPLICATION_DEFAULT_APP_NAME) + application_name = get_application_name(ops_test, test_application_name) - if application_name: + if check_for_existing_application and application_name: if len(ops_test.model.applications[application_name].units) != 1: async with ops_test.fast_forward("60s"): await scale_application(ops_test, application_name, 1) @@ -195,22 +203,22 @@ async def deploy_and_scale_application(ops_test: OpsTest) -> str: async with ops_test.fast_forward("60s"): await ops_test.model.deploy( APPLICATION_DEFAULT_APP_NAME, - application_name=APPLICATION_DEFAULT_APP_NAME, + application_name=test_application_name, num_units=1, channel="latest/edge", base="ubuntu@22.04", ) await ops_test.model.wait_for_idle( - apps=[APPLICATION_DEFAULT_APP_NAME], + apps=[test_application_name], status="waiting", raise_on_blocked=True, timeout=TIMEOUT, ) - assert len(ops_test.model.applications[APPLICATION_DEFAULT_APP_NAME].units) == 1 + assert len(ops_test.model.applications[test_application_name].units) == 1 - return APPLICATION_DEFAULT_APP_NAME + return test_application_name async def relate_mysql_and_application( @@ -223,13 +231,27 @@ async def relate_mysql_and_application( mysql_application_name: The mysql charm application name application_name: The continuous writes test charm application name """ - if is_relation_joined(ops_test, "database", "database"): + if is_relation_joined( + ops_test, + "database", + "database", + application_one=mysql_application_name, + application_two=application_name, + ): return await ops_test.model.relate( f"{application_name}:database", f"{mysql_application_name}:database" ) - await ops_test.model.block_until(lambda: is_relation_joined(ops_test, "database", "database")) + await ops_test.model.block_until( + lambda: is_relation_joined( + ops_test, + "database", + "database", + application_one=mysql_application_name, + application_two=application_name, + ) + ) await ops_test.model.wait_for_idle( apps=[mysql_application_name, application_name], @@ -669,3 +691,15 @@ def delete_pod(ops_test: OpsTest, unit: Unit) -> None: ], check=True, ) + + +def get_endpoint_addresses(ops_test: OpsTest, endpoint_name: str) -> list[str]: + """Retrieve the addresses selected by a K8s endpoint.""" + client = lightkube.Client() + endpoint = client.get( + Endpoints, + namespace=ops_test.model.info.name, + name=endpoint_name, + ) + + return [address.ip for subset in endpoint.subsets for address in subset.addresses] diff --git a/tests/integration/high_availability/test_k8s_endpoints.py b/tests/integration/high_availability/test_k8s_endpoints.py new file mode 100644 index 000000000..bcd846e94 --- /dev/null +++ b/tests/integration/high_availability/test_k8s_endpoints.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import logging + +import pytest +from pytest_operator.plugin import OpsTest + +from ..helpers import get_unit_address +from .high_availability_helpers import ( + deploy_and_scale_application, + deploy_and_scale_mysql, + get_endpoint_addresses, + relate_mysql_and_application, +) + +logger = logging.getLogger(__name__) + +MYSQL_CLUSTER_ONE = "mysql1" +MYSQL_CLUSTER_TWO = "mysql2" +MYSQL_CLUSTER_NAME = "test_cluster" +TEST_APP_ONE = "mysql-test-app1" +TEST_APP_TWO = "mysql-test-app2" + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_labeling_of_k8s_endpoints(ops_test: OpsTest): + """Test the labeling of k8s endpoints when apps with same cluster-name deployed.""" + logger.info("Deploying first mysql cluster") + mysql_cluster_one = await deploy_and_scale_mysql( + ops_test, + check_for_existing_application=False, + mysql_application_name=MYSQL_CLUSTER_ONE, + cluster_name=MYSQL_CLUSTER_NAME, + ) + + logger.info("Deploying and relating test app with cluster") + await deploy_and_scale_application( + ops_test, + check_for_existing_application=False, + test_application_name=TEST_APP_ONE, + ) + + await relate_mysql_and_application( + ops_test, + mysql_application_name=MYSQL_CLUSTER_ONE, + application_name=TEST_APP_ONE, + ) + + logger.info("Deploying second mysql application with same cluster name") + mysql_cluster_two = await deploy_and_scale_mysql( + ops_test, + check_for_existing_application=False, + mysql_application_name=MYSQL_CLUSTER_TWO, + cluster_name=MYSQL_CLUSTER_NAME, + ) + + logger.info("Deploying and relating another test app with second cluster") + await deploy_and_scale_application( + ops_test, + check_for_existing_application=False, + test_application_name=TEST_APP_TWO, + ) + + await relate_mysql_and_application( + ops_test, + mysql_application_name=MYSQL_CLUSTER_TWO, + application_name=TEST_APP_TWO, + ) + + logger.info("Ensuring that the created k8s endpoints have correct addresses") + cluster_one_ips = [ + await get_unit_address(ops_test, unit.name) + for unit in ops_test.model.applications[mysql_cluster_one].units + ] + + cluster_one_primary_addresses = get_endpoint_addresses( + ops_test, f"{mysql_cluster_one}-primary" + ) + cluster_one_replica_addresses = get_endpoint_addresses( + ops_test, f"{mysql_cluster_one}-replicas" + ) + + for primary in cluster_one_primary_addresses: + assert ( + primary in cluster_one_ips + ), f"{primary} (not belonging to cluster 1) should not be in cluster one addresses" + + assert set(cluster_one_primary_addresses + cluster_one_replica_addresses) == set( + cluster_one_ips + ), "IPs not belonging to cluster one in cluster one addresses" + + cluster_two_ips = [ + await get_unit_address(ops_test, unit.name) + for unit in ops_test.model.applications[mysql_cluster_two].units + ] + + cluster_two_primary_addresses = get_endpoint_addresses( + ops_test, f"{mysql_cluster_two}-primary" + ) + cluster_two_replica_addresses = get_endpoint_addresses( + ops_test, f"{mysql_cluster_two}-replicas" + ) + + for primary in cluster_two_primary_addresses: + assert ( + primary in cluster_two_ips + ), f"{primary} (not belonging to cluster w) should not be in cluster two addresses" + + assert set(cluster_two_primary_addresses + cluster_two_replica_addresses) == set( + cluster_two_ips + ), "IPs not belonging to cluster two in cluster two addresses"