diff --git a/actions.yaml b/actions.yaml index 4e4084fb5..f10580367 100644 --- a/actions.yaml +++ b/actions.yaml @@ -2,7 +2,13 @@ # See LICENSE file for licensing details. get-cluster-status: - description: Get cluster status information without topology + description: Get cluster status information + params: + cluster-set: + type: boolean + default: False + description: Whether to fetch the cluster or cluster-set status. + Possible values are False (default) or True. get-password: description: Fetch the system user's password, which is used by charm. @@ -28,7 +34,7 @@ set-password: set-tls-private-key: description: Set the privates key, which will be used for certificate signing requests (CSR). Run - for each unit separately. + for each unit separately. params: internal-key: type: string @@ -55,3 +61,54 @@ pre-upgrade-check: resume-upgrade: description: Resume a rolling upgrade after asserting successful upgrade of a new revision. + +promote-standby-cluster: + description: | + Promotes this cluster to become the leader in the cluster-set. Used for safe switchover or failover. + Must be run against the charm leader unit of a standby cluster. + params: + cluster-set-name: + type: string + description: | + The name of the cluster-set. Mandatory option, used for confirmation. + force: + type: boolean + default: False + description: | + Use force when previous primary is unreachable (failover). Will invalidate previous + primary. + +recreate-cluster: + description: | + Recreates cluster on one or more standalone units that were previously part of a standby cluster. + + When a standby cluster is removed from an async replication relation, the cluster will be dissolved and + each unit will be kept in blocked status. Recreating the cluster allows to rejoin the async replication + relation, or usage as a standalone cluster. + +fence-writes: + description: | + Stops write traffic to a primary cluster of a ClusterSet. + params: + cluster-set-name: + type: string + description: | + The name of the cluster-set. Mandatory option, used for confirmation. + +unfence-writes: + description: | + Resumes write traffic to a primary cluster of a ClusterSet. + params: + cluster-set-name: + type: string + description: | + The name of the cluster-set. Mandatory option, used for confirmation. + +rejoin-cluster: + description: | + Rejoins an invalidated cluster to the cluster-set, after a previous failover or switchover. + params: + cluster-name: + type: string + description: | + The name of the cluster to be rejoined. diff --git a/config.yaml b/config.yaml index a92da4c89..d91194416 100644 --- a/config.yaml +++ b/config.yaml @@ -3,7 +3,12 @@ options: cluster-name: - description: "Optional - Name of the MySQL InnoDB cluster" + description: "Optional - Name of the MySQL InnoDB cluster, set once at deployment" + type: "string" + cluster-set-name: + description: | + Optional - Name for async replication cluster set, set once at deployment. + On `recreate-clster` action call, the cluster set name will be re-generated automatically. type: "string" profile: description: | diff --git a/lib/charms/data_platform_libs/v0/data_secrets.py b/lib/charms/data_platform_libs/v0/data_secrets.py index 254b9af3d..7adc91549 100644 --- a/lib/charms/data_platform_libs/v0/data_secrets.py +++ b/lib/charms/data_platform_libs/v0/data_secrets.py @@ -97,7 +97,7 @@ def get_content(self) -> Dict[str, str]: """Getting cached secret content.""" if not self._secret_content: if self.meta: - self._secret_content = self.meta.get_content() + self._secret_content = self.meta.get_content(refresh=True) return self._secret_content def set_content(self, content: Dict[str, str]) -> None: diff --git a/lib/charms/mysql/v0/async_replication.py b/lib/charms/mysql/v0/async_replication.py new file mode 100644 index 000000000..a3c0896f3 --- /dev/null +++ b/lib/charms/mysql/v0/async_replication.py @@ -0,0 +1,777 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""MySQL async replication module.""" + +import enum +import logging +import typing +import uuid +from functools import cached_property + +from charms.mysql.v0.mysql import ( + MySQLFencingWritesError, + MySQLPromoteClusterToPrimaryError, + MySQLRejoinClusterError, +) +from ops import ( + ActionEvent, + ActiveStatus, + BlockedStatus, + MaintenanceStatus, + Relation, + RelationDataContent, + Secret, + SecretNotFoundError, + WaitingStatus, +) +from ops.framework import Object +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed +from typing_extensions import Optional + +from constants import ( + BACKUPS_PASSWORD_KEY, + BACKUPS_USERNAME, + CLUSTER_ADMIN_PASSWORD_KEY, + CLUSTER_ADMIN_USERNAME, + MONITORING_PASSWORD_KEY, + MONITORING_USERNAME, + ROOT_PASSWORD_KEY, + ROOT_USERNAME, + SERVER_CONFIG_PASSWORD_KEY, + SERVER_CONFIG_USERNAME, +) + +if typing.TYPE_CHECKING: + from charm import MySQLOperatorCharm + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "4de21f1a022c4e2c87ac8e672ec16f6a" +LIBAPI = 0 +LIBPATCH = 1 + +PRIMARY_RELATION = "async-primary" +REPLICA_RELATION = "async-replica" +SECRET_LABEL = "async-secret" + + +class ClusterSetInstanceState(typing.NamedTuple): + """Cluster set instance state.""" + + cluster_role: str # primary or replica + instance_role: str # primary or secondary + relation_side: str # primary or replica + + +class States(str, enum.Enum): + """States for the relation.""" + + UNINITIALIZED = "uninitialized" # relation is not initialized + SYNCING = "syncing" # credentials are being synced + INITIALIZING = "initializing" # cluster to be added + RECOVERING = "recovery" # replica cluster is being recovered + READY = "ready" # cluster set is ready + FAILED = "failed" # cluster set is in a failed state + + +class MySQLAsyncReplication(Object): + """MySQL async replication base class.""" + + def __init__(self, charm: "MySQLOperatorCharm", relation_name: str): + super().__init__(charm, relation_name) + self._charm = charm + + # relation broken is observed on all units + self.framework.observe( + self._charm.on[PRIMARY_RELATION].relation_broken, self.on_async_relation_broken + ) + self.framework.observe( + self._charm.on[REPLICA_RELATION].relation_broken, self.on_async_relation_broken + ) + + @cached_property + def role(self) -> ClusterSetInstanceState: + """Current cluster set role of the unit, after the relation is established.""" + is_replica = self._charm._mysql.is_cluster_replica() + + if is_replica: + cluster_role = "replica" + elif is_replica is False: + cluster_role = "primary" + else: + cluster_role = "unset" + + _, instance_role = self._charm._mysql.get_member_state() + + if self.model.get_relation(REPLICA_RELATION): + relation_side = "replica" + else: + relation_side = "primary" + + return ClusterSetInstanceState(cluster_role, instance_role, relation_side) + + @property + def cluster_name(self) -> str: + """This cluster name.""" + return self._charm.app_peer_data["cluster-name"] + + @property + def cluster_set_name(self) -> str: + """Cluster set name.""" + return self._charm.app_peer_data["cluster-set-domain-name"] + + def get_remote_relation_data(self, relation: Relation) -> Optional[RelationDataContent]: + """Remote data.""" + if not relation.app: + return + return relation.data[relation.app] + + def _on_promote_standby_cluster(self, event: ActionEvent) -> None: + """Promote a standby cluster to primary.""" + if not self._charm.unit.is_leader(): + event.fail("Only the leader unit can promote a standby cluster") + return + + if not self._charm._mysql.is_cluster_replica(): + event.fail("Only a standby cluster can be promoted") + return + + if event.params.get("cluster-set-name") != self.cluster_set_name: + event.fail("Invalid/empty cluster set name") + return + + # promote cluster to primary + cluster_name = self.cluster_name + force = event.params.get("force", False) + + try: + self._charm._mysql.promote_cluster_to_primary(cluster_name, force) + message = f"Cluster {cluster_name} promoted to primary" + logger.info(message) + event.set_results({"message": message}) + self._charm._on_update_status(None) + except MySQLPromoteClusterToPrimaryError: + logger.exception("Failed to promote cluster to primary") + event.fail("Failed to promote cluster to primary") + + def _on_fence_unfence_writes_action(self, event: ActionEvent) -> None: + """Fence or unfence writes to a cluster.""" + if event.params.get("cluster-set-name") != self.cluster_set_name: + event.fail("Invalid/empty cluster set name") + return + + if self.role.cluster_role == "replica": + event.fail("Only a primary cluster can have writes fenced/unfence") + return + + try: + if ( + event.handle.kind == "fence_writes_action" + and not self._charm._mysql.is_cluster_writes_fenced() + ): + logger.info("Fencing writes to the cluster") + self._charm._mysql.fence_writes() + event.set_results({"message": "Writes to the cluster are now fenced"}) + elif ( + event.handle.kind == "unfence_writes_action" + and self._charm._mysql.is_cluster_writes_fenced() + ): + logger.info("Unfencing writes to the cluster") + self._charm._mysql.unfence_writes() + event.set_results({"message": "Writes to the cluster are now resumed"}) + # update status + self._charm._on_update_status(None) + except MySQLFencingWritesError: + event.fail("Failed to fence writes. Check logs for details") + + def on_async_relation_broken(self, event): # noqa: C901 + """Handle the async relation being broken from either side.""" + # Remove the replica cluster, if this is the primary + + if self.role.cluster_role in ("replica", "unset") and not self._charm.removing_unit: + # The cluster being removed is a replica cluster + # role is `unset` when the primary cluster dissolved the replica before + # this hook execution i.e. was faster on running the handler + + self._charm.unit.status = WaitingStatus("Waiting for cluster to be dissolved") + try: + # hold execution until the cluster is dissolved + for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(10)): + with attempt: + if self._charm._mysql.is_instance_in_cluster(self._charm.unit_label): + logger.debug("Waiting for cluster to be dissolved") + raise Exception + except RetryError: + self._charm.unit.status = BlockedStatus( + "Replica cluster not dissolved after relation broken" + ) + logger.warning( + "Replica cluster not dissolved after relation broken by the primary cluster." + "\n\tThis happens when the primary cluster was removed prior to removing the async relation." + "\n\tThis cluster can be promoted to primary with the `promote-standby-cluster` action." + ) + return + + self._charm.unit.status = BlockedStatus("Standalone read-only unit.") + # reset flag to allow instances rejoining the cluster + self._charm.unit_peer_data["member-state"] = "waiting" + del self._charm.unit_peer_data["unit-initialized"] + if self._charm.unit.is_leader(): + self._charm.app.status = BlockedStatus("Recreate or rejoin cluster.") + logger.info( + "\n\tThis is a replica cluster and will be dissolved.\n" + "\tThe cluster can be recreated with the `recreate-cluster` action.\n" + "\tAlternatively the cluster can be rejoined to the cluster set." + ) + # reset the cluster node count flag + del self._charm.app_peer_data["units-added-to-cluster"] + # set flag to persist removed from cluster-set state + self._charm.app_peer_data["removed-from-cluster-set"] = "true" + + elif self.role.cluster_role == "primary": + if self._charm.unit.is_leader(): + # only leader units can remove replica clusters + remote_data = self.get_remote_relation_data(event.relation) or {} + if cluster_name := remote_data.get("cluster-name"): + if self._charm._mysql.is_cluster_in_cluster_set(cluster_name): + self._charm.unit.status = MaintenanceStatus("Removing replica cluster") + logger.debug(f"Removing replica cluster {cluster_name}") + + # force removal when cluster is invalidated + force = self._charm._mysql.get_replica_cluster_status(cluster_name) in [ + "invalidated", + "unknown", + ] + + self._charm._mysql.remove_replica_cluster(cluster_name, force=force) + logger.debug(f"Replica cluster {cluster_name} removed") + self._charm.unit.status = ActiveStatus(self._charm.active_status_message) + else: + logger.warning( + f"Replica cluster {cluster_name} not found in cluster set, skipping removal" + ) + else: + # Relation being broken before setup, e.g.: due to replica with user data + logger.warning("No cluster name found, skipping removal") + elif self._charm.unit_peer_data.get("member-state") == "waiting": + # set member-state to allow status update + # needed for secondaries status update when removing due to replica with user data + self._charm.unit_peer_data["member-state"] = "unknown" + self._charm._on_update_status(None) + + def _on_rejoin_cluster_action(self, event: ActionEvent) -> None: + """Rejoin cluster to cluster set action handler.""" + cluster = event.params.get("cluster-name") + if not cluster: + message = "Invalid/empty cluster name" + event.fail(message) + logger.info(message) + return + + if not self._charm._mysql.is_cluster_in_cluster_set(cluster): + message = f"Cluster {cluster=} not found in cluster set" + event.fail(message) + logger.info(message) + return + + status = self._charm._mysql.get_replica_cluster_status(cluster) + if status != "invalidated": + message = f"Cluster {status=}. Only `invalidated` clusters can be rejoined" + event.fail(message) + logger.info(message) + return + + try: + self._charm._mysql.rejoin_cluster(cluster) + message = f"{cluster=} rejoined to cluster set" + logger.info(message) + event.set_results({"message": message}) + except MySQLRejoinClusterError: + message = f"Failed to rejoin {cluster=} to the cluster set" + event.fail(message) + logger.error(message) + + +class MySQLAsyncReplicationPrimary(MySQLAsyncReplication): + """MySQL async replication primary side. + + Implements the setup phase of the async replication for the primary side. + """ + + def __init__(self, charm: "MySQLOperatorCharm"): + super().__init__(charm, PRIMARY_RELATION) + + # Actions observed only on the primary class to avoid duplicated execution + # promotion action since both classes are always instantiated + self.framework.observe( + self._charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster + ) + + # fence writes action + self.framework.observe( + self._charm.on.fence_writes_action, self._on_fence_unfence_writes_action + ) + # unfence writes action + self.framework.observe( + self._charm.on.unfence_writes_action, self._on_fence_unfence_writes_action + ) + # rejoin invalidated cluster action + self.framework.observe( + self._charm.on.rejoin_cluster_action, self._on_rejoin_cluster_action + ) + + if self._charm.unit.is_leader(): + self.framework.observe( + self._charm.on[PRIMARY_RELATION].relation_created, self._on_primary_created + ) + self.framework.observe( + self._charm.on[PRIMARY_RELATION].relation_changed, + self._on_primary_relation_changed, + ) + + def get_relation(self, relation_id: int) -> Optional[Relation]: + """Return the relation.""" + return self.model.get_relation(PRIMARY_RELATION, relation_id) + + def get_local_relation_data(self, relation: Relation) -> Optional[RelationDataContent]: + """Local data.""" + return relation.data[self.model.app] + + def get_state(self, relation: Relation) -> Optional[States]: + """State of the relation, on primary side.""" + if not relation: + return States.UNINITIALIZED + + local_data = self.get_local_relation_data(relation) + remote_data = self.get_remote_relation_data(relation) or {} + + if not local_data: + return States.UNINITIALIZED + + if local_data.get("is-replica") == "true": + return States.FAILED + + if local_data.get("secret-id") and not remote_data.get("endpoint"): + return States.SYNCING + + if local_data.get("secret-id") and remote_data.get("endpoint"): + # evaluate cluster status + replica_status = self._charm._mysql.get_replica_cluster_status( + remote_data["cluster-name"] + ) + if replica_status in ["ok", "invalidated"]: + return States.READY + elif replica_status == "unknown": + return States.INITIALIZING + else: + return States.RECOVERING + + @property + def idle(self) -> bool: + """Whether the async replication is idle for all related clusters.""" + if not self.model.unit.is_leader(): + # non leader units are always idle + return True + + for relation in self.model.relations[PRIMARY_RELATION]: + if self.get_state(relation) not in [States.READY, States.UNINITIALIZED]: + return False + return True + + def _get_secret(self) -> Secret: + """Return async replication necessary secrets.""" + try: + # Avoid recreating the secret + secret = self._charm.model.get_secret(label=SECRET_LABEL) + if not secret.id: + # workaround for the secret id not being set with model uuid + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + return secret + except SecretNotFoundError: + pass + + app_secret = self._charm.model.get_secret(label=f"{self.model.app.name}.app") + content = app_secret.peek_content() + # filter out unnecessary secrets + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + + return self._charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL) + + def _on_primary_created(self, event): + """Validate relations and share credentials with replica cluster.""" + if not self._charm.unit_initialized: + logger.debug("Unit not initialized, deferring event") + event.defer() + return + + if self._charm._mysql.is_cluster_replica(): + logger.error( + f"This is a replica cluster, cannot be related as {PRIMARY_RELATION}. Remove relation." + ) + self._charm.unit.status = BlockedStatus( + f"This is a replica cluster. Unrelate from the {PRIMARY_RELATION} relation" + ) + event.relation.data[self.model.app]["is-replica"] = "true" + return + + self._charm.app.status = MaintenanceStatus("Setting up async replication") + logger.debug("Granting secrets access to async replication relation") + secret = self._get_secret() + secret_id = secret.id + secret.grant(event.relation) + + # get workload version + version = self._charm._mysql.get_mysql_version() + + logger.debug(f"Sharing {secret_id=} with replica cluster") + # Set variables for credential sync and validations + event.relation.data[self.model.app].update( + { + "secret-id": secret_id, + "cluster-name": self.cluster_name, + "mysql-version": version, + "cluster-set-name": self.cluster_set_name, + } + ) + + def _on_primary_relation_changed(self, event): + """Handle the async_primary relation being changed.""" + state = self.get_state(event.relation) + + if state == States.INITIALIZING: + # Add replica cluster primary node + logger.debug("Initializing replica cluster") + self._charm.unit.status = MaintenanceStatus("Adding replica cluster") + remote_data = self.get_remote_relation_data(event.relation) or {} + + cluster = remote_data["cluster-name"] + endpoint = remote_data["endpoint"] + unit_label = remote_data["node-label"] + + logger.debug("Looking for a donor node") + _, ro, _ = self._charm._mysql.get_cluster_endpoints(get_ips=False) + + if not ro: + logger.debug(f"Adding replica {cluster=} with {endpoint=}. Primary is the donor") + self._charm._mysql.create_replica_cluster( + endpoint, cluster, instance_label=unit_label + ) + else: + donor = ro.split(",")[0] + logger.debug(f"Adding replica {cluster=} with {endpoint=} using {donor=}") + self._charm._mysql.create_replica_cluster( + endpoint, cluster, instance_label=unit_label, donor=donor + ) + + event.relation.data[self.model.app]["replica-state"] = "initialized" + logger.debug("Replica cluster created") + self._charm._on_update_status(None) + + elif state == States.RECOVERING: + # Recover replica cluster + self._charm.unit.status = MaintenanceStatus("Replica cluster in recovery") + + +class MySQLAsyncReplicationReplica(MySQLAsyncReplication): + """MySQL async replication replica side. + + Implements the setup phase of the async replication for the replica side. + """ + + def __init__(self, charm: "MySQLOperatorCharm"): + super().__init__(charm, REPLICA_RELATION) + + if self._charm.unit.is_leader(): + # leader/primary + self.framework.observe( + self._charm.on[REPLICA_RELATION].relation_created, self._on_replica_created + ) + self.framework.observe( + self._charm.on[REPLICA_RELATION].relation_changed, self._on_replica_changed + ) + else: + # non-leader/secondaries + self.framework.observe( + self._charm.on[REPLICA_RELATION].relation_created, + self._on_replica_non_leader_created, + ) + self.framework.observe( + self._charm.on[REPLICA_RELATION].relation_changed, + self._on_replica_non_leader_changed, + ) + + @property + def relation(self) -> Optional[Relation]: + """Relation.""" + return self.model.get_relation(REPLICA_RELATION) + + @property + def relation_data(self) -> RelationDataContent: + """Relation data.""" + return self.relation.data[self.model.app] + + @property + def remote_relation_data(self) -> Optional[RelationDataContent]: + """Relation data.""" + if not self.relation.app: + return + return self.relation.data[self.relation.app] + + @property + def state(self) -> Optional[States]: + """State of the relation, on replica side.""" + if not self.relation: + return None + + if self.relation_data.get("user-data-found") == "true": + return States.FAILED + + if self.remote_relation_data.get("secret-id") and not self.relation_data.get("endpoint"): + # received credentials from primary cluster + # and did not synced credentials + return States.SYNCING + + if self.replica_initialized: + # cluster added to cluster-set by primary cluster + if self._charm.cluster_fully_initialized: + return States.READY + return States.RECOVERING + return States.INITIALIZING + + @property + def idle(self) -> bool: + """Whether the async replication is idle.""" + if not self.model.unit.is_leader(): + # non leader units are always idle + return True + + return self.state in [States.READY, None] + + @property + def returning_cluster(self) -> bool: + """Whether to skip checks. + + Used for skipping checks when a replica cluster was removed through broken relation. + """ + remote_cluster_set_name = self.remote_relation_data.get("cluster-set-name") + return ( + self._charm.app_peer_data.get("removed-from-cluster-set") == "true" + and self.cluster_set_name == remote_cluster_set_name + ) + + @property + def replica_initialized(self) -> bool: + """Whether the replica cluster is initialized as such.""" + return self.remote_relation_data.get("replica-state") == "initialized" + + def _check_version(self) -> bool: + """Check if the MySQL version is compatible with the primary cluster.""" + remote_version = self.remote_relation_data.get("mysql-version") + local_version = self._charm._mysql.get_mysql_version() + + if not remote_version: + return False + + if remote_version != local_version: + logger.error( + f"Primary cluster MySQL version {remote_version} is not compatible with this cluster MySQL version {local_version}" + ) + return False + + return True + + def _obtain_secret(self) -> Secret: + """Get secret from primary cluster.""" + secret_id = self.remote_relation_data.get("secret-id") + return self._charm.model.get_secret(id=secret_id, label=SECRET_LABEL) + + def _async_replication_credentials(self) -> dict[str, str]: + """Get async replication credentials from primary cluster.""" + secret = self._obtain_secret() + return secret.peek_content() + + def _get_endpoint(self) -> str: + """Get endpoint to be used by the primary cluster. + + This is the address in which the unit must be reachable from the primary cluster. + Not necessarily the locally resolved address, but an ingress address. + """ + # TODO: devise method to inform the real address + # using unit informed address (fqdn or ip) + return self._charm.unit_address + + def _on_replica_created(self, event): + """Handle the async_replica relation being created on the leader unit.""" + if not self._charm.unit_initialized and not self.returning_cluster: + # avoid running too early for non returning clusters + logger.debug("Unit not initialized, deferring event") + event.defer() + return + if self.returning_cluster: + # flag set on prior async relation broken + # allows the relation to be created with user data so + # rejoining to the cluster-set can be done incrementally + # on incompatible user data, join fallbacks to clone + logger.debug("User data check skipped") + else: + logger.debug("Checking for user data") + if self._charm._mysql.get_non_system_databases(): + # don't check for user data if skip flag is set + logger.info( + "\n\tUser data found, aborting async replication setup." + "\n\tEnsure the cluster has no user data before trying to join a cluster set." + "\n\tAfter removing/backing up the data, remove the relation and add it again." + ) + self._charm.app.status = BlockedStatus( + "User data found, check instruction in the log" + ) + self._charm.unit.status = BlockedStatus( + "User data found, aborting async replication setup" + ) + self.relation_data["user-data-found"] = "true" + return + + self._charm.app.status = MaintenanceStatus("Setting up async replication") + self._charm.unit.status = WaitingStatus("Awaiting sync data from primary cluster") + + def _on_replica_changed(self, event): # noqa: C901 + """Handle the async_replica relation being changed.""" + state = self.state + logger.debug(f"Replica cluster {state.value=}") + + if state == States.SYNCING: + if self.returning_cluster: + # when running from and async relation broken + # re-create the cluster and wait + logger.debug("Recreating cluster prior to sync credentials") + self._charm.create_cluster() + # (re)set flags + self._charm.app_peer_data.update( + {"removed-from-cluster-set": "", "rejoin-secondaries": "true"} + ) + event.defer() + return + if not self._charm.cluster_fully_initialized: + # cluster is not fully initialized + # avoid race on credentials sync + logger.debug( + "Cluster not fully initialized yet, waiting until all units join the cluster" + ) + event.defer() + return + + if not self._check_version(): + self._charm.unit.status = BlockedStatus( + "MySQL version mismatch with primary cluster. Check logs for details" + ) + logger.error("MySQL version mismatch with primary cluster. Remove relation.") + return + + logger.debug("Syncing credentials from primary cluster") + self._charm.unit.status = MaintenanceStatus("Syncing credentials") + self._charm.app.status = MaintenanceStatus("Setting up async replication") + + try: + credentials = self._async_replication_credentials() + except SecretNotFoundError: + logger.debug("Secret not found, deferring event") + event.defer() + return + sync_keys = { + SERVER_CONFIG_PASSWORD_KEY: SERVER_CONFIG_USERNAME, + CLUSTER_ADMIN_PASSWORD_KEY: CLUSTER_ADMIN_USERNAME, + MONITORING_PASSWORD_KEY: MONITORING_USERNAME, + BACKUPS_PASSWORD_KEY: BACKUPS_USERNAME, + ROOT_PASSWORD_KEY: ROOT_USERNAME, + } + + for key, password in credentials.items(): + # sync credentials only for necessary users + user = sync_keys[key] + if user == ROOT_USERNAME: + # root user is only local + self._charm._mysql.update_user_password(user, password, host="localhost") + else: + self._charm._mysql.update_user_password(user, password) + self._charm.set_secret("app", key, password) + logger.debug(f"Synced {user=} password") + + self._charm.unit.status = MaintenanceStatus("Dissolving replica cluster") + logger.debug("Dissolving replica cluster") + self._charm._mysql.dissolve_cluster() + # reset the cluster node count flag + del self._charm.app_peer_data["units-added-to-cluster"] + # reset force rejoin-secondaries flag + del self._charm.app_peer_data["rejoin-secondaries"] + + if self.remote_relation_data["cluster-name"] == self.cluster_name: + # this cluster need a new cluster name + logger.warning( + "Cluster name is the same as the primary cluster. Appending generatade value" + ) + self._charm.app_peer_data[ + "cluster-name" + ] = f"{self.cluster_name}{uuid.uuid4().hex[:4]}" + + self._charm.unit.status = MaintenanceStatus("Populate endpoint") + + # this cluster name is used by the primary cluster to identify the replica cluster + self.relation_data["cluster-name"] = self.cluster_name + # the reachable endpoint address + self.relation_data["endpoint"] = self._get_endpoint() + # the node label in the replica cluster to be created + self.relation_data["node-label"] = self._charm.unit_label + + logger.debug("Data for adding replica cluster shared with primary cluster") + + self._charm.unit.status = WaitingStatus("Waiting for primary cluster") + elif state == States.READY: + # update status + logger.debug("Replica cluster primary is ready") + + # sync cluster-set domain name across clusters + if cluster_set_domain_name := self._charm._mysql.get_cluster_set_name(): + self._charm.app_peer_data["cluster-set-domain-name"] = cluster_set_domain_name + + # set the number of units added to the cluster for a single unit replica cluster + # needed here since it will skip the `RECOVERING` state + if self._charm.app.planned_units() == 1: + self._charm.app_peer_data["units-added-to-cluster"] = "1" + + self._charm._on_update_status(None) + elif state == States.RECOVERING: + # recovering cluster (copying data and/or joining units) + self._charm.app.status = MaintenanceStatus("Recovering replica cluster") + self._charm.unit.status = WaitingStatus( + "Waiting for recovery to complete on other units" + ) + logger.debug("Awaiting other units to join the cluster") + # reset the number of units added to the cluster + # this will trigger secondaries to join the cluster + node_count = self._charm._mysql.get_cluster_node_count() + self._charm.app_peer_data["units-added-to-cluster"] = str(node_count) + event.defer() + + def _on_replica_non_leader_created(self, _): + """Handle the async_replica relation being created for secondaries/non-leader.""" + # set waiting state to inhibit auto recovery, only when not already set + if not self._charm.unit_peer_data.get("member-state") == "waiting": + self._charm.unit_peer_data["member-state"] = "waiting" + self._charm.unit.status = WaitingStatus("waiting replica cluster be configured") + + def _on_replica_non_leader_changed(self, _): + """Reset cluster secondaries to allow cluster rejoin after primary recovery.""" + # the replica state is initialized when the primary cluster finished + # creating the replica cluster on this cluster primary/leader unit + if ( + self.replica_initialized + or self._charm.app_peer_data.get("rejoin-secondaries") == "true" + ) and not self._charm._mysql.is_instance_in_cluster(self._charm.unit_label): + logger.debug("Reset secondary unit to allow cluster rejoin") + # reset unit flag to allow cluster rejoin after primary recovery + # the unit will rejoin on the next peer relation changed or update status + del self._charm.unit_peer_data["unit-initialized"] + self._charm.unit_peer_data["member-state"] = "waiting" + self._charm.unit.status = WaitingStatus("waiting to join the cluster") diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index 2be32065a..ccb07cb1c 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -68,6 +68,7 @@ def wait_until_mysql_connection(self) -> None: import configparser import dataclasses import enum +import hashlib import io import json import logging @@ -93,6 +94,7 @@ def wait_until_mysql_connection(self) -> None: CLUSTER_ADMIN_PASSWORD_KEY, CLUSTER_ADMIN_USERNAME, COS_AGENT_RELATION_NAME, + GR_MAX_MEMBERS, MONITORING_PASSWORD_KEY, MONITORING_USERNAME, PASSWORD_LENGTH, @@ -115,7 +117,7 @@ def wait_until_mysql_connection(self) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 57 +LIBPATCH = 58 UNIT_TEARDOWN_LOCKNAME = "unit-teardown" UNIT_ADD_LOCKNAME = "unit-add" @@ -362,6 +364,26 @@ class MySQLGetAvailableMemoryError(Error): """Exception raised when there is an issue getting the available memory.""" +class MySQLCreateReplicaClusterError(Error): + """Exception raised when there is an issue creating a replica cluster.""" + + +class MySQLRemoveReplicaClusterError(Error): + """Exception raised when there is an issue removing a replica cluster.""" + + +class MySQLPromoteClusterToPrimaryError(Error): + """Exception raised when there is an issue promoting a replica cluster to primary.""" + + +class MySQLFencingWritesError(Error): + """Exception raised when there is an issue fencing or unfencing writes.""" + + +class MySQLRejoinClusterError(Error): + """Exception raised when there is an issue trying to rejoin a cluster to the cluster set.""" + + @dataclasses.dataclass class RouterUser: """MySQL Router user.""" @@ -423,6 +445,7 @@ def __init__(self, *args): self.framework.observe(self.on.get_cluster_status_action, self._get_cluster_status) self.framework.observe(self.on.get_password_action, self._on_get_password) self.framework.observe(self.on.set_password_action, self._on_set_password) + self.framework.observe(self.on.recreate_cluster_action, self._recreate_cluster) # Set in some event handlers in order to avoid passing event down a chain # of methods @@ -495,7 +518,14 @@ def _on_set_password(self, event: ActionEvent) -> None: def _get_cluster_status(self, event: ActionEvent) -> None: """Action used to retrieve the cluster status.""" - if status := self._mysql.get_cluster_status(): + if event.params.get("cluster-set"): + logger.debug("Getting cluster set status") + status = self._mysql.get_cluster_set_status(extended=0) + else: + logger.debug("Getting cluster status") + status = self._mysql.get_cluster_status() + + if status: event.set_results( { "success": True, @@ -510,18 +540,71 @@ def _get_cluster_status(self, event: ActionEvent) -> None: } ) + def _recreate_cluster(self, event: ActionEvent) -> None: + """Action used to recreate the cluster, for special cases.""" + if not self.unit.is_leader(): + event.fail("recreate-cluster action can only be run on the leader unit.") + return + + if self.app_peer_data.get("removed-from-cluster-set"): + # remove the flag if it exists. Allow further cluster rejoin + del self.app_peer_data["removed-from-cluster-set"] + + # reset cluster-set-name to config or previous value + hash = self.generate_random_hash() + self.app_peer_data["cluster-set-domain-name"] = self.model.config.get( + "cluster-set-name", f"cluster-set-{hash}" + ) + + logger.info("Recreating cluster") + try: + self.create_cluster() + self.unit.status = ops.ActiveStatus(self.active_status_message) + except (MySQLCreateClusterError, MySQLCreateClusterSetError) as e: + logger.exception("Failed to recreate cluster") + event.fail(str(e)) + + def create_cluster(self) -> None: + """Create the MySQL InnoDB cluster on the unit. + + Should only be run by the leader unit. + """ + self._mysql.create_cluster(self.unit_label) + self._mysql.create_cluster_set() + self._mysql.initialize_juju_units_operations_table() + # rescan cluster for cleanup of unused + # recovery users + self._mysql.rescan_cluster() + self.app_peer_data["units-added-to-cluster"] = "1" + + state, role = self._mysql.get_member_state() + + self.unit_peer_data.update( + {"member-state": state, "member-role": role, "unit-initialized": "True"} + ) + @property def peers(self) -> Optional[ops.model.Relation]: """Retrieve the peer relation.""" return self.model.get_relation(PEER) @property - def cluster_initialized(self): + def cluster_initialized(self) -> bool: """Returns True if the cluster is initialized.""" return int(self.app_peer_data.get("units-added-to-cluster", "0")) >= 1 @property - def unit_initialized(self): + def cluster_fully_initialized(self) -> bool: + """Returns True if the cluster is fully initialized. + + Fully initialized means that all unit that can be joined are joined. + """ + return self._mysql.get_cluster_node_count(node_status=MySQLMemberState["ONLINE"]) == min( + GR_MAX_MEMBERS, self.app.planned_units() + ) + + @property + def unit_initialized(self) -> bool: """Return True if the unit is initialized.""" return self.unit_peer_data.get("unit-initialized") == "True" @@ -555,7 +638,7 @@ def unit_label(self): return self.unit.name.replace("/", "-") @property - def _is_peer_data_set(self): + def _is_peer_data_set(self) -> bool: return bool( self.app_peer_data.get("cluster-name") and self.get_secret("app", ROOT_PASSWORD_KEY) @@ -581,6 +664,28 @@ def has_cos_relation(self) -> bool: return len(active_cos_relations) > 0 + @property + def active_status_message(self) -> str: + """Active status message.""" + if self.unit_peer_data.get("member-role") == "primary": + if self._mysql.is_cluster_replica(): + status = self._mysql.get_replica_cluster_status() + if status == "ok": + return "Primary (standby)" + else: + return f"Primary (standby, {status})" + elif self._mysql.is_cluster_writes_fenced(): + return "Primary (fenced writes)" + else: + return "Primary" + + return "" + + @property + def removing_unit(self) -> bool: + """Check if the unit is being removed.""" + return self.unit_peer_data.get("unit-status") == "removing" + def peer_relation_data(self, scope: Scopes) -> DataPeer: """Returns the peer relation data per scope.""" if scope == APP_SCOPE: @@ -644,6 +749,16 @@ def remove_secret(self, scope: Scopes, key: str) -> None: peers = self.model.get_relation(PEER) self.peer_relation_data(scope).delete_relation_data(peers.id, [key]) + @staticmethod + def generate_random_hash() -> str: + """Generate a hash based on a random string. + + Returns: + A hash based on a random string. + """ + random_characters = generate_random_password(10) + return hashlib.md5(random_characters.encode("utf-8")).hexdigest() + class MySQLMemberState(str, enum.Enum): """MySQL Cluster member state.""" @@ -659,6 +774,13 @@ class MySQLMemberState(str, enum.Enum): UNKNOWN = "unknown" +class MySQLClusterState(str, enum.Enum): + """MySQL Cluster state.""" + + OK = "ok" + FENCED = "fenced_writes" + + class MySQLTextLogs(str, enum.Enum): """MySQL Text logs.""" @@ -792,12 +914,15 @@ def render_mysqld_configuration( config.write(string_io) return string_io.getvalue(), dict(config["mysqld"]) - def configure_mysql_users(self): + def configure_mysql_users(self, password_needed: bool = True) -> None: """Configure the MySQL users for the instance. Create `@%` user with the appropriate privileges, and reconfigure `root@localhost` user password. + Args: + password_needed: flag to indicate if the root password is needed. Default is True. + Raises MySQLConfigureMySQLUsersError if the user creation fails. """ # SYSTEM_USER and SUPER privileges to revoke from the root users @@ -836,10 +961,13 @@ def configure_mysql_users(self): try: logger.debug(f"Configuring MySQL users for {self.instance_address}") - self._run_mysqlcli_script( - "; ".join(configure_users_commands), - password=self.root_password, - ) + if password_needed: + self._run_mysqlcli_script( + "; ".join(configure_users_commands), + password=self.root_password, + ) + else: + self._run_mysqlcli_script("; ".join(configure_users_commands)) except MySQLClientError as e: logger.exception( f"Failed to configure users for: {self.instance_address} with error {e.message}", @@ -1209,6 +1337,191 @@ def create_cluster_set(self) -> None: logger.exception("Failed to add instance to cluster set on instance") raise MySQLCreateClusterSetError + def create_replica_cluster( + self, + endpoint: str, + replica_cluster_name: str, + instance_label: str, + donor: Optional[str] = None, + method: Optional[str] = "auto", + ) -> None: + """Create a replica cluster from the primary cluster. + + Args: + endpoint: The endpoint of the replica cluster leader unit + replica_cluster_name: The name of the replica cluster + instance_label: The label to apply to the replica cluster instance + donor: The donor instance address definition to clone from + method: The method to use to create the replica cluster (auto, clone) + + Raises: + MySQLCreateReplicaClusterError + """ + options = { + "recoveryProgress": 0, + "recoveryMethod": method, + "timeout": 0, + "communicationStack": "MySQL", + } + + if donor: + options["cloneDonor"] = donor + + commands = ( + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + "cs = dba.get_cluster_set()", + f"repl_cluster = cs.create_replica_cluster('{endpoint}','{replica_cluster_name}', {options})", + f"repl_cluster.set_instance_option('{endpoint}', 'label', '{instance_label}')", + ) + + try: + logger.debug(f"Creating replica cluster {replica_cluster_name}") + self._run_mysqlsh_script("\n".join(commands)) + except MySQLClientError: + if method == "auto": + logger.warning( + "Failed to create replica cluster with auto method, fallback to clone method" + ) + self.create_replica_cluster( + endpoint, + replica_cluster_name, + instance_label, + donor, + method="clone", + ) + else: + logger.exception("Failed to create replica cluster") + raise MySQLCreateReplicaClusterError + + def promote_cluster_to_primary(self, cluster_name: str, force: bool = False) -> None: + """Promote a cluster to become the primary cluster on the cluster set. + + Args: + cluster_name: The name of the cluster to promote + force: Whether to force the promotion (due to a unreachable cluster) + + Raises: + MySQLPromoteClusterToActiveError + """ + commands = ( + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + "cs = dba.get_cluster_set()", + ( + f"cs.force_primary_cluster('{cluster_name}')" + if force + else f"cs.set_primary_cluster('{cluster_name}')" + ), + ) + + try: + logger.debug(f"Promoting {cluster_name=} to primary with {force=}") + self._run_mysqlsh_script("\n".join(commands)) + except MySQLClientError: + logger.exception("Failed to promote cluster to primary") + raise MySQLPromoteClusterToPrimaryError + + def fence_writes(self) -> None: + """Fence writes on the primary cluster. + + Raises: + MySQLFenceUnfenceWritesError + """ + commands = ( + f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + "c = dba.get_cluster()", + "c.fence_writes()", + ) + + try: + self._run_mysqlsh_script("\n".join(commands)) + except MySQLClientError: + logger.exception("Failed to fence writes on cluster") + raise MySQLFencingWritesError + + def unfence_writes(self) -> None: + """Unfence writes on the primary cluster and reset read_only flag. + + Raises: + MySQLFenceUnfenceWritesError + """ + commands = ( + f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + "c = dba.get_cluster()", + "c.unfence_writes()", + "session.run_sql('SET GLOBAL read_only=OFF')", + ) + + try: + self._run_mysqlsh_script("\n".join(commands)) + except MySQLClientError: + logger.exception("Failed to resume writes on primary cluster") + raise MySQLFencingWritesError + + def is_cluster_writes_fenced(self) -> Optional[bool]: + """Check if the cluster is fenced against writes. + + Returns: + True if the cluster is fenced, False otherwise + """ + status = self.get_cluster_status() + if not status: + return + + return status["defaultreplicaset"]["status"] == MySQLClusterState.FENCED + + def is_cluster_in_cluster_set(self, cluster_name: str) -> Optional[bool]: + """Check if a cluster is in the cluster set.""" + cs_status = self.get_cluster_set_status(extended=0) + + if cs_status is None: + return None + + return cluster_name in cs_status["clusters"] + + def rejoin_cluster(self, cluster_name) -> None: + """Try to rejoin a cluster to the cluster set.""" + commands = ( + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + "cs = dba.get_cluster_set()", + f"cs.rejoin_cluster('{cluster_name}')", + ) + + try: + logger.debug(f"Rejoining {cluster_name=}") + self._run_mysqlsh_script("\n".join(commands)) + logger.info(f"Rejoined {cluster_name=}") + except MySQLClientError: + logger.exception("Failed to rejoin cluster") + raise MySQLRejoinClusterError + + def remove_replica_cluster(self, replica_cluster_name: str, force: bool = False) -> None: + """Remove a replica cluster on the primary cluster. + + The removed cluster will be implicitly dissolved. + + Args: + replica_cluster_name: The name of the replica cluster + force: Whether to force the removal of the replica cluster + + Raises: + MySQLRemoveReplicaClusterError + """ + commands = [ + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + "cs = dba.get_cluster_set()", + ] + if force: + commands.append(f"cs.remove_cluster('{replica_cluster_name}', {{'force': True}})") + else: + commands.append(f"cs.remove_cluster('{replica_cluster_name}')") + + try: + logger.debug(f"Removing replica cluster {replica_cluster_name}") + self._run_mysqlsh_script("\n".join(commands)) + except MySQLClientError: + logger.exception("Failed to remove replica cluster") + raise MySQLRemoveReplicaClusterError + def initialize_juju_units_operations_table(self) -> None: """Initialize the mysql.juju_units_operations table using the serverconfig user. @@ -1217,7 +1530,8 @@ def initialize_juju_units_operations_table(self) -> None: initializing the juju_units_operations table """ initialize_table_commands = ( - "CREATE TABLE IF NOT EXISTS mysql.juju_units_operations (task varchar(20), executor " + "DROP TABLE IF EXISTS mysql.juju_units_operations", + "CREATE TABLE mysql.juju_units_operations (task varchar(20), executor " "varchar(20), status varchar(20), primary key(task))", f"INSERT INTO mysql.juju_units_operations values ('{UNIT_TEARDOWN_LOCKNAME}', '', " "'not-started') ON DUPLICATE KEY UPDATE executor = '', status = 'not-started'", @@ -1244,9 +1558,11 @@ def initialize_juju_units_operations_table(self) -> None: def add_instance_to_cluster( self, + *, instance_address: str, instance_unit_label: str, from_instance: Optional[str] = None, + lock_instance: Optional[str] = None, method: str = "auto", ) -> None: """Add an instance to the InnoDB cluster. @@ -1261,6 +1577,7 @@ def add_instance_to_cluster( instance_address: address of the instance to add to the cluster instance_unit_label: the label/name of the unit from_instance: address of the adding instance, e.g. primary + lock_instance: address of the instance to lock on method: recovery method to use, either "auto" or "clone" """ options = { @@ -1268,8 +1585,12 @@ def add_instance_to_cluster( "label": instance_unit_label, } + local_lock_instance = lock_instance or from_instance or self.instance_address + if not self._acquire_lock( - from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME + local_lock_instance, + instance_unit_label, + UNIT_ADD_LOCKNAME, ): raise MySQLLockAcquisitionError("Lock not acquired") @@ -1306,13 +1627,15 @@ def add_instance_to_cluster( f"Cannot add {instance_address=} to {self.cluster_name=} with recovery {method=}. Trying method 'clone'" ) self.add_instance_to_cluster( - instance_address, instance_unit_label, from_instance, method="clone" + instance_address=instance_address, + instance_unit_label=instance_unit_label, + from_instance=from_instance, + lock_instance=lock_instance, + method="clone", ) finally: # always release the lock - self._release_lock( - from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME - ) + self._release_lock(local_lock_instance, instance_unit_label, UNIT_ADD_LOCKNAME) def is_instance_configured_for_innodb( self, instance_address: str, instance_unit_label: str @@ -1464,23 +1787,101 @@ def get_cluster_status(self, extended: Optional[bool] = False) -> Optional[dict] except MySQLClientError: logger.error(f"Failed to get cluster status for {self.cluster_name}") - def get_cluster_node_count(self, from_instance: Optional[str] = None) -> int: + def get_cluster_set_status( + self, extended: Optional[int] = 1, from_instance: Optional[str] = None + ) -> Optional[dict]: + """Get the cluster-set status. + + Executes script to retrieve cluster-set status. + Won't raise errors. + + Args: + extended: whether to return extended status (default: 1) + from_instance: member instance to run the command from (fallback to current) + + Returns: + Cluster-set status as a dictionary, + or None if running the status script fails. + """ + options = {"extended": extended} + status_commands = ( + f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{from_instance or self.instance_address}')", + "cs = dba.get_cluster_set()", + f"print(cs.status({options}))", + ) + + try: + output = self._run_mysqlsh_script("\n".join(status_commands), timeout=150) + output_dict = json.loads(output.lower()) + return output_dict + except MySQLClientError: + logger.warning("Failed to get cluster set status") + + def get_cluster_names(self) -> set[str]: + """Get the names of the clusters in the cluster set. + + Returns: + A set of cluster names + """ + status = self.get_cluster_set_status() + if not status: + return set() + return set(status["clusters"]) + + def get_replica_cluster_status(self, replica_cluster_name: Optional[str] = None) -> str: + """Get the replica cluster status. + + Executes script to retrieve replica cluster status. + Won't raise errors. + + Returns: + Replica cluster status as a string + """ + if not replica_cluster_name: + replica_cluster_name = self.cluster_name + status_commands = ( + f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')", + "cs = dba.get_cluster_set()", + f"print(cs.status(extended=1)['clusters']['{replica_cluster_name}']['globalStatus'])", + ) + + try: + output = self._run_mysqlsh_script("\n".join(status_commands), timeout=30) + return output.lower().strip() + except MySQLClientError: + logger.warning(f"Failed to get replica cluster status for {replica_cluster_name}") + return "unknown" + + def get_cluster_node_count( + self, from_instance: Optional[str] = None, node_status: Optional[MySQLMemberState] = None + ) -> int: """Retrieve current count of cluster nodes. + Args: + from_instance: member instance to run the command from (fallback to current) + node_status: status of the nodes to count + Returns: Amount of cluster nodes. """ + if not node_status: + query = "SELECT COUNT(*) FROM performance_schema.replication_group_members" + else: + query = ( + "SELECT COUNT(*) FROM performance_schema.replication_group_members" + f" WHERE member_state = '{node_status.value.upper()}'" + ) size_commands = ( f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}" f"@{from_instance or self.instance_address}')", - 'result = session.run_sql("SELECT COUNT(*) FROM performance_schema.replication_group_members")', + f'result = session.run_sql("{query}")', 'print(f"{result.fetch_one()[0]}")', ) try: output = self._run_mysqlsh_script("\n".join(size_commands)) - except MySQLClientError as e: - logger.warning("Failed to get node count", exc_info=e) + except MySQLClientError: + logger.warning("Failed to get node count") return 0 matches = re.search(r"(\d)", output) @@ -1517,12 +1918,12 @@ def _get_host_ip(host: str) -> str: ro_endpoints = { _get_host_ip(v["address"]) if get_ips else v["address"] for v in topology.values() - if v["memberrole"] == "secondary" and v["status"] == MySQLMemberState.ONLINE + if v["mode"] == "r/o" and v["status"] == MySQLMemberState.ONLINE } rw_endpoints = { _get_host_ip(v["address"]) if get_ips else v["address"] for v in topology.values() - if v["memberrole"] == "primary" and v["status"] == MySQLMemberState.ONLINE + if v["mode"] == "r/w" and v["status"] == MySQLMemberState.ONLINE } # won't get offline endpoints to IP as they maybe unreachable no_endpoints = { @@ -1537,7 +1938,7 @@ def _get_host_ip(host: str) -> str: reraise=True, wait=wait_random(min=4, max=30), ) - def remove_instance(self, unit_label: str) -> None: + def remove_instance(self, unit_label: str, lock_instance: Optional[str] = None) -> None: """Remove instance from the cluster. This method is called from each unit being torn down, thus we must obtain @@ -1554,6 +1955,8 @@ def remove_instance(self, unit_label: str) -> None: Args: unit_label: The label for this unit's instance (to be torn down) """ + remaining_cluster_member_addresses = list() + skip_release_lock = False try: # Get the cluster primary's address to direct lock acquisition request to. primary_address = self.get_cluster_primary_address() @@ -1563,37 +1966,55 @@ def remove_instance(self, unit_label: str) -> None: ) # Attempt to acquire a lock on the primary instance - acquired_lock = self._acquire_lock(primary_address, unit_label, UNIT_TEARDOWN_LOCKNAME) + acquired_lock = self._acquire_lock( + lock_instance or primary_address, unit_label, UNIT_TEARDOWN_LOCKNAME + ) if not acquired_lock: + logger.debug(f"Failed to acquire lock to remove unit {unit_label}. Retrying.") raise MySQLRemoveInstanceRetryError("Did not acquire lock to remove unit") - # Get remaining cluster member addresses before calling mysqlsh.remove_instance() - remaining_cluster_member_addresses, valid = self._get_cluster_member_addresses( - exclude_unit_labels=[unit_label] - ) - if not valid: - raise MySQLRemoveInstanceRetryError("Unable to retrieve cluster member addresses") - # Remove instance from cluster, or dissolve cluster if no other members remain logger.debug( f"Removing instance {self.instance_address} from cluster {self.cluster_name}" ) - remove_instance_options = { - "password": self.cluster_admin_password, - "force": "true", - } - dissolve_cluster_options = { - "force": "true", - } - remove_instance_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')", - f"cluster = dba.get_cluster('{self.cluster_name}')", - "number_cluster_members = len(cluster.status()['defaultReplicaSet']['topology'])", - f"cluster.remove_instance('{self.cluster_admin_user}@{self.instance_address}', " - f"{json.dumps(remove_instance_options)}) if number_cluster_members > 1 else" - f" cluster.dissolve({json.dumps(dissolve_cluster_options)})", - ) - self._run_mysqlsh_script("\n".join(remove_instance_commands)) + + if self.get_cluster_node_count() == 1: + # Last instance in the cluster, dissolve the cluster + cluster_names = self.get_cluster_names() + if len(cluster_names) > 1 and not self.is_cluster_replica(): + # when last instance from a primary cluster belonging to a cluster set + # promote another cluster to primary prior to dissolving + another_cluster = (cluster_names - {self.cluster_name}).pop() + self.promote_cluster_to_primary(another_cluster) + # update lock instance + lock_instance = self.get_cluster_set_global_primary_address() + self.remove_replica_cluster(self.cluster_name) + else: + skip_release_lock = True + self.dissolve_cluster() + + else: + # Get remaining cluster member addresses before calling mysqlsh.remove_instance() + remaining_cluster_member_addresses, valid = self._get_cluster_member_addresses( + exclude_unit_labels=[unit_label] + ) + if not valid: + raise MySQLRemoveInstanceRetryError( + "Unable to retrieve cluster member addresses" + ) + + # Just remove instance + remove_instance_options = { + "password": self.cluster_admin_password, + "force": "true", + } + remove_instance_commands = ( + f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')", + f"cluster = dba.get_cluster('{self.cluster_name}')", + "cluster.remove_instance(" + f"'{self.cluster_admin_user}@{self.instance_address}', {remove_instance_options})", + ) + self._run_mysqlsh_script("\n".join(remove_instance_commands)) except MySQLClientError as e: # In case of an error, raise an error and retry logger.warning( @@ -1601,31 +2022,44 @@ def remove_instance(self, unit_label: str) -> None: exc_info=e, ) raise MySQLRemoveInstanceRetryError(e.message) + finally: + # There is no need to release the lock if single cluster was dissolved + if skip_release_lock: + return - # There is no need to release the lock if cluster was dissolved - if not remaining_cluster_member_addresses: - return - - # The below code should not result in retries of this method since the - # instance would already be removed from the cluster. - try: - # Retrieve the cluster primary's address again (in case the old primary is scaled down) - # Release the lock by making a request to this primary member's address - primary_address = self.get_cluster_primary_address( - connect_instance_address=remaining_cluster_member_addresses[0] - ) - if not primary_address: - raise MySQLRemoveInstanceError( - "Unable to retrieve the address of the cluster primary" - ) - - self._release_lock(primary_address, unit_label, UNIT_TEARDOWN_LOCKNAME) - except MySQLClientError as e: - # Raise an error that does not lead to a retry of this method - logger.exception( - f"Failed to release lock on {unit_label} with error {e.message}", exc_info=e - ) - raise MySQLRemoveInstanceError(e.message) + try: + if not lock_instance: + if len(remaining_cluster_member_addresses) == 0: + raise MySQLRemoveInstanceRetryError( + "No remaining instance to query cluster primary from." + ) + + # Retrieve the cluster primary's address again (in case the old primary is + # scaled down) + # Release the lock by making a request to this primary member's address + lock_instance = self.get_cluster_primary_address( + connect_instance_address=remaining_cluster_member_addresses[0] + ) + if not lock_instance: + raise MySQLRemoveInstanceError( + "Unable to retrieve the address of the cluster primary" + ) + + self._release_lock(lock_instance, unit_label, UNIT_TEARDOWN_LOCKNAME) + except MySQLClientError as e: + # Raise an error that does not lead to a retry of this method + logger.exception(f"Failed to release lock on {unit_label}") + raise MySQLRemoveInstanceError(e.message) + + def dissolve_cluster(self) -> None: + """Dissolve the cluster independently of the unit teardown process.""" + logger.debug(f"Dissolving cluster {self.cluster_name}") + dissolve_cluster_commands = ( + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"cluster = dba.get_cluster('{self.cluster_name}')", + "cluster.dissolve({'force': 'true'})", + ) + self._run_mysqlsh_script("\n".join(dissolve_cluster_commands)) def _acquire_lock(self, primary_address: str, unit_label: str, lock_name: str) -> bool: """Attempts to acquire a lock by using the mysql.juju_units_operations table. @@ -1655,7 +2089,7 @@ def _acquire_lock(self, primary_address: str, unit_label: str, lock_name: str) - try: output = self._run_mysqlsh_script("\n".join(acquire_lock_commands)) except MySQLClientError: - logger.debug("Failed to acquire lock") + logger.debug(f"Failed to acquire lock {lock_name}") return False matches = re.search(r"(\d)", output) if not matches: @@ -1678,7 +2112,8 @@ def _release_lock(self, primary_address: str, unit_label: str, lock_name: str) - release_lock_commands = ( f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{primary_address}')", - f"session.run_sql(\"UPDATE mysql.juju_units_operations SET executor='', status='not-started' WHERE task='{lock_name}' AND executor='{unit_label}';\")", + "session.run_sql(\"UPDATE mysql.juju_units_operations SET executor='', status='not-started'" + f" WHERE task='{lock_name}' AND executor='{unit_label}';\")", ) self._run_mysqlsh_script("\n".join(release_lock_commands)) @@ -1719,9 +2154,8 @@ def get_cluster_primary_address( ) -> Optional[str]: """Get the cluster primary's address. - Keyword args: - connect_instance_address: The address for the cluster primary - (default to this instance's address) + Args: + connect_instance_address: address for a cluster instance to query from Returns: The address of the cluster's primary @@ -1748,6 +2182,41 @@ def get_cluster_primary_address( return matches.group(1) + def get_cluster_set_global_primary_address( + self, connect_instance_address: Optional[str] = None + ) -> Optional[str]: + """Get the cluster set global primary's address. + + The global primary is the primary instance on the primary cluster set. + + Args: + connect_instance_address: address for a cluster instance to query from + """ + if not connect_instance_address: + connect_instance_address = self.instance_address + logger.debug( + f"Getting cluster set global primary member's address from {connect_instance_address}" + ) + + get_cluster_set_global_primary_commands = ( + f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance_address}')", + "cs = dba.get_cluster_set()", + "global_primary = cs.status()['globalPrimaryInstance']", + "print(f'{global_primary}')", + ) + + try: + output = self._run_mysqlsh_script("\n".join(get_cluster_set_global_primary_commands)) + except MySQLClientError as e: + logger.warning("Failed to get cluster set global primary addresses", exc_info=e) + raise MySQLGetClusterPrimaryAddressError(e.message) + matches = re.search(r"(.+)", output) + + if not matches: + return None + + return matches.group(1) + def get_primary_label(self) -> Optional[str]: """Get the label of the cluster's primary.""" status = self.get_cluster_status() @@ -1915,7 +2384,7 @@ def update_user_password(self, username: str, new_password: str, host: str = "%" logger.debug(f"Updating password for {username}.") update_user_password_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", f"session.run_sql(\"ALTER USER '{username}'@'{host}' IDENTIFIED BY '{new_password}';\")", 'session.run_sql("FLUSH PRIVILEGES;")', ) @@ -1976,6 +2445,64 @@ def get_member_state(self) -> Tuple[str, str]: raise MySQLGetMemberStateError("No member state retrieved") + def is_cluster_replica(self, from_instance: Optional[str] = None) -> Optional[bool]: + """Check if cluster is a replica. + + Args: + from_instance: The instance to run the command from (optional) + + Returns: + True if cluster is a replica, False otherwise. + """ + cs_status = self.get_cluster_set_status(extended=0, from_instance=from_instance) + if not cs_status: + return + + return cs_status["clusters"][self.cluster_name.lower()]["clusterrole"] == "replica" + + def cluster_set_cluster_count(self, from_instance: Optional[str] = None) -> int: + """Get the number of clusters in the cluster set. + + Args: + from_instance: The instance to run the command from (optional) + + Returns: + The number of clusters in the cluster set. + """ + cs_status = self.get_cluster_set_status(extended=0, from_instance=from_instance) + if not cs_status: + return 0 + + return len(cs_status["clusters"]) + + def get_cluster_set_name(self, from_instance: Optional[str] = None) -> Optional[str]: + """Get cluster set name. + + Args: + from_instance: The instance to run the command from (optional) + + Returns: + The cluster set name. + """ + cs_status = self.get_cluster_set_status(extended=0, from_instance=from_instance) + if not cs_status: + return None + + return cs_status["domainname"] + + def stop_group_replication(self) -> None: + """Stop Group replication if enabled on the instance.""" + stop_gr_command = ( + f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + "data = session.run_sql('SELECT 1 FROM performance_schema.replication_group_members')", + "if len(data.fetch_all()) > 0:", + " session.run_sql('STOP GROUP_REPLICATION')", + ) + try: + self._run_mysqlsh_script("\n".join(stop_gr_command)) + except MySQLClientError: + logger.debug("Failed to stop Group Replication for unit") + def reboot_from_complete_outage(self) -> None: """Wrapper for reboot_cluster_from_complete_outage command.""" reboot_from_outage_command = ( @@ -2562,10 +3089,31 @@ def flush_mysql_logs(self, logs_type: Union[MySQLTextLogs, list[MySQLTextLogs]]) flush_logs_commands.append(f'session.run_sql("FLUSH {logs_type.value}")') # type: ignore try: - self._run_mysqlsh_script("\n".join(flush_logs_commands)) + self._run_mysqlsh_script("\n".join(flush_logs_commands), timeout=50) except MySQLClientError: logger.exception(f"Failed to flush {logs_type} logs.") + def get_databases(self) -> set[str]: + """Return a set with all databases on the server.""" + list_databases_commands = ( + f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + 'result = session.run_sql("SHOW DATABASES")', + "for db in result.fetch_all():\n print(db[0])", + ) + + output = self._run_mysqlsh_script("\n".join(list_databases_commands)) + return set(output.split()) + + def get_non_system_databases(self) -> set[str]: + """Return a set with all non system databases on the server.""" + return self.get_databases() - { + "information_schema", + "mysql", + "mysql_innodb_cluster_metadata", + "performance_schema", + "sys", + } + @abstractmethod def is_mysqld_running(self) -> bool: """Returns whether mysqld is running.""" @@ -2599,6 +3147,11 @@ def wait_until_mysql_connection(self, check_port: bool = True) -> None: """ raise NotImplementedError + @abstractmethod + def reset_data_dir(self) -> None: + """Reset the data directory.""" + raise NotImplementedError + @abstractmethod def _run_mysqlsh_script(self, script: str, timeout: Optional[int] = None) -> str: """Execute a MySQL shell script. @@ -2635,8 +3188,3 @@ def _run_mysqlcli_script( timeout: (optional) time before the query should timeout """ raise NotImplementedError - - @abstractmethod - def reset_data_dir(self) -> None: - """Reset the data directory.""" - raise NotImplementedError diff --git a/metadata.yaml b/metadata.yaml index ef696324d..5955b2269 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -53,6 +53,8 @@ provides: interface: prometheus_scrape grafana-dashboard: interface: grafana_dashboard + async-primary: + interface: async_replication requires: certificates: @@ -67,6 +69,10 @@ requires: interface: loki_push_api limit: 1 optional: true + async-replica: + interface: async_replication + limit: 1 + optional: true storage: database: diff --git a/pyproject.toml b/pyproject.toml index 37003e9cf..2f0230f59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,7 +83,8 @@ show_missing = true [tool.pytest.ini_options] minversion = "6.0" log_cli_level = "INFO" -markers = ["unstable"] +markers = ["unstable", "juju3", "only_with_juju_secrets", "only_without_juju_secrets"] +asyncio_mode = "auto" # Formatting tools configuration [tool.black] @@ -93,6 +94,7 @@ target-version = ["py38"] [tool.isort] profile = "black" known_third_party = "mysql.connector" +line_length = 99 # Linting tools configuration [tool.flake8] diff --git a/src/charm.py b/src/charm.py index e4ceaa069..9619a64aa 100755 --- a/src/charm.py +++ b/src/charm.py @@ -5,7 +5,9 @@ """Charm for MySQL.""" import logging +import random from socket import getfqdn +from time import sleep from typing import Optional import ops @@ -13,6 +15,10 @@ from charms.data_platform_libs.v0.s3 import S3Requirer from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider from charms.loki_k8s.v0.loki_push_api import LogProxyConsumer +from charms.mysql.v0.async_replication import ( + MySQLAsyncReplicationPrimary, + MySQLAsyncReplicationReplica, +) from charms.mysql.v0.backups import MySQLBackups from charms.mysql.v0.mysql import ( BYTES_1MB, @@ -33,13 +39,7 @@ from ops import EventBase, RelationBrokenEvent, RelationCreatedEvent from ops.charm import RelationChangedEvent, UpdateStatusEvent from ops.main import main -from ops.model import ( - ActiveStatus, - BlockedStatus, - Container, - MaintenanceStatus, - WaitingStatus, -) +from ops.model import ActiveStatus, BlockedStatus, Container, MaintenanceStatus, WaitingStatus from ops.pebble import Layer from config import CharmConfig, MySQLConfig @@ -76,7 +76,7 @@ from relations.mysql_root import MySQLRootRelation from rotate_mysql_logs import RotateMySQLLogs, RotateMySQLLogsCharmEvents from upgrade import MySQLK8sUpgrade, get_mysql_k8s_dependencies_model -from utils import compare_dictionaries, generate_random_hash, generate_random_password +from utils import compare_dictionaries, generate_random_password logger = logging.getLogger(__name__) @@ -88,7 +88,7 @@ class MySQLOperatorCharm(MySQLCharmBase, TypedCharmBase[CharmConfig]): # RotateMySQLLogsCharmEvents needs to be defined on the charm object for # the log rotate manager process (which runs juju-run/juju-exec to dispatch # a custom event) - on = RotateMySQLLogsCharmEvents() + on = RotateMySQLLogsCharmEvents() # pyright: ignore [reportAssignmentType] def __init__(self, *args): super().__init__(*args) @@ -144,6 +144,8 @@ def __init__(self, *args): self.log_rotate_manager.start_log_rotate_manager() self.rotate_mysql_logs = RotateMySQLLogs(self) + self.async_primary = MySQLAsyncReplicationPrimary(self) + self.async_replica = MySQLAsyncReplicationReplica(self) @property def _mysql(self) -> MySQL: @@ -152,15 +154,21 @@ def _mysql(self) -> MySQL: self._get_unit_fqdn(), self.app_peer_data["cluster-name"], self.app_peer_data["cluster-set-domain-name"], - self.get_secret("app", ROOT_PASSWORD_KEY), + self.get_secret("app", ROOT_PASSWORD_KEY), # pyright: ignore [reportArgumentType] SERVER_CONFIG_USERNAME, - self.get_secret("app", SERVER_CONFIG_PASSWORD_KEY), + self.get_secret( + "app", SERVER_CONFIG_PASSWORD_KEY + ), # pyright: ignore [reportArgumentType] CLUSTER_ADMIN_USERNAME, - self.get_secret("app", CLUSTER_ADMIN_PASSWORD_KEY), + self.get_secret( + "app", CLUSTER_ADMIN_PASSWORD_KEY + ), # pyright: ignore [reportArgumentType] MONITORING_USERNAME, - self.get_secret("app", MONITORING_PASSWORD_KEY), + self.get_secret( + "app", MONITORING_PASSWORD_KEY + ), # pyright: ignore [reportArgumentType] BACKUPS_USERNAME, - self.get_secret("app", BACKUPS_PASSWORD_KEY), + self.get_secret("app", BACKUPS_PASSWORD_KEY), # pyright: ignore [reportArgumentType] self.unit.get_container(CONTAINER_NAME), self.k8s_helpers, self, @@ -199,20 +207,18 @@ def _pebble_layer(self) -> Layer: }, }, } - return Layer(layer) - - @property - def active_status_message(self) -> str: - """Active status message.""" - if self.unit_peer_data.get("member-role") == "primary": - return "Primary" - return "" + return Layer(layer) # pyright: ignore [reportArgumentType] @property def restart_peers(self) -> Optional[ops.model.Relation]: """Retrieve the peer relation.""" return self.model.get_relation("restart") + @property + def unit_address(self) -> str: + """Return the address of this unit.""" + return self._get_unit_fqdn() + def get_unit_hostname(self, unit_name: Optional[str] = None) -> str: """Get the hostname.localdomain for a unit. @@ -268,6 +274,7 @@ def _is_unit_waiting_to_join_cluster(self) -> bool: and self.unit_peer_data.get("member-state") == "waiting" and self._mysql.is_data_dir_initialised() and not self.unit_peer_data.get("unit-initialized") + and int(self.app_peer_data.get("units-added-to-cluster", 0)) > 0 ) def join_unit_to_cluster(self) -> None: @@ -276,55 +283,71 @@ def join_unit_to_cluster(self) -> None: Try to join the unit from the primary unit. """ instance_label = self.unit.name.replace("/", "-") - instance_fqdn = self._get_unit_fqdn(self.unit.name) + instance_address = self._get_unit_fqdn(self.unit.name) - if self._mysql.is_instance_in_cluster(instance_label): - logger.debug("instance already in cluster") - return - - # Add new instance to the cluster - try: - cluster_primary = self._get_primary_from_online_peer() - if not cluster_primary: - self.unit.status = WaitingStatus("waiting to get cluster primary from peers") - logger.debug("waiting: unable to retrieve the cluster primary from peers") - return + if not self._mysql.is_instance_in_cluster(instance_label): + # Add new instance to the cluster + try: + cluster_primary = self._get_primary_from_online_peer() + if not cluster_primary: + self.unit.status = WaitingStatus("waiting to get cluster primary from peers") + logger.debug("waiting: unable to retrieve the cluster primary from peers") + return + + if ( + self._mysql.get_cluster_node_count(from_instance=cluster_primary) + == GR_MAX_MEMBERS + ): + self.unit.status = WaitingStatus( + f"Cluster reached max size of {GR_MAX_MEMBERS} units. Standby." + ) + logger.warning( + f"Cluster reached max size of {GR_MAX_MEMBERS} units. This unit will stay as standby." + ) + return + + # If instance is part of a replica cluster, locks are managed by the + # the primary cluster primary (i.e. cluster set global primary) + lock_instance = None + if self._mysql.is_cluster_replica(from_instance=cluster_primary): + lock_instance = self._mysql.get_cluster_set_global_primary_address( + connect_instance_address=cluster_primary + ) - if self._mysql.get_cluster_node_count(from_instance=cluster_primary) == GR_MAX_MEMBERS: - self.unit.status = WaitingStatus( - f"Cluster reached max size of {GR_MAX_MEMBERS} units. Standby." - ) - logger.warning( - f"Cluster reached max size of {GR_MAX_MEMBERS} units. This unit will stay as standby." + # add random delay to mitigate collisions when multiple units are joining + # due the difference between the time we test for locks and acquire them + sleep(random.uniform(0, 1.5)) + + if self._mysql.are_locks_acquired(from_instance=lock_instance or cluster_primary): + self.unit.status = WaitingStatus("waiting to join the cluster") + logger.debug("waiting: cluster lock is held") + return + + self.unit.status = MaintenanceStatus("joining the cluster") + + # Stop GR for cases where the instance was previously part of the cluster + # harmless otherwise + self._mysql.stop_group_replication() + self._mysql.add_instance_to_cluster( + instance_address=instance_address, + instance_unit_label=instance_label, + from_instance=cluster_primary, + lock_instance=lock_instance, ) + logger.debug(f"Added instance {instance_address} to cluster") + except MySQLAddInstanceToClusterError: + logger.debug(f"Unable to add instance {instance_address} to cluster.") return - - if self._mysql.are_locks_acquired(from_instance=cluster_primary): + except MySQLLockAcquisitionError: self.unit.status = WaitingStatus("waiting to join the cluster") - logger.debug("waiting: cluster lock is held") + logger.debug("waiting: failed to acquire lock when adding instance to cluster") return - self.unit.status = MaintenanceStatus("joining the cluster") - - # Stop GR for cases where the instance was previously part of the cluster - # harmless otherwise - self._mysql.stop_group_replication() - self._mysql.add_instance_to_cluster( - instance_fqdn, instance_label, from_instance=cluster_primary - ) - logger.debug(f"Added instance {instance_fqdn} to cluster") - - # Update 'units-added-to-cluster' counter in the peer relation databag - self.unit_peer_data["unit-initialized"] = "True" - self.unit_peer_data["member-state"] = "online" - self.unit.status = ActiveStatus(self.active_status_message) - logger.debug(f"Instance {instance_label} is cluster member") - - except MySQLAddInstanceToClusterError: - logger.debug(f"Unable to add instance {instance_fqdn} to cluster.") - except MySQLLockAcquisitionError: - self.unit.status = WaitingStatus("waiting to join the cluster") - logger.debug("waiting: failed to acquire lock when adding instance to cluster") + # Update 'units-added-to-cluster' counter in the peer relation databag + self.unit_peer_data["unit-initialized"] = "True" + self.unit_peer_data["member-state"] = "online" + self.unit.status = ActiveStatus(self.active_status_message) + logger.debug(f"Instance {instance_label} is cluster member") def _reconcile_pebble_layer(self, container: Container) -> None: """Reconcile pebble layer.""" @@ -462,11 +485,13 @@ def _on_leader_elected(self, _) -> None: ) # Create and set cluster and cluster-set names in the peer relation databag - common_hash = generate_random_hash() + common_hash = self.generate_random_hash() self.app_peer_data.setdefault( "cluster-name", self.config.cluster_name or f"cluster-{common_hash}" ) - self.app_peer_data.setdefault("cluster-set-domain-name", f"cluster-set-{common_hash}") + self.app_peer_data.setdefault( + "cluster-set-domain-name", self.config.cluster_set_name or f"cluster-set-{common_hash}" + ) def _open_ports(self) -> None: """Open ports if supported. @@ -506,7 +531,7 @@ def _configure_instance(self, container) -> None: logger.info("Configuring instance") # Configure all base users and revoke privileges from the root users - self._mysql.configure_mysql_users() + self._mysql.configure_mysql_users(password_needed=False) # Configure instance as a cluster node self._mysql.configure_instance() @@ -576,7 +601,7 @@ def _on_mysql_pebble_ready(self, event) -> None: self._configure_instance(container) if not self.unit.is_leader(): - # Non-leader units should wait for leader to add them to the cluster + # Non-leader units try to join cluster self.unit.status = WaitingStatus("Waiting for instance to join the cluster") self.unit_peer_data.update({"member-role": "secondary", "member-state": "waiting"}) self.join_unit_to_cluster() @@ -584,20 +609,10 @@ def _on_mysql_pebble_ready(self, event) -> None: try: # Create the cluster when is the leader unit - logger.info("Creating cluster on the leader unit") - self._mysql.create_cluster(self.unit_label) - self._mysql.create_cluster_set() - - self._mysql.initialize_juju_units_operations_table() - # Start control flag - self.app_peer_data["units-added-to-cluster"] = "1" - - state, role = self._mysql.get_member_state() - self.unit_peer_data.update( - {"member-state": state, "member-role": role, "unit-initialized": "True"} - ) + logger.info(f"Creating cluster {self.app_peer_data['cluster-name']}") + self.create_cluster() + self.unit.status = ops.ActiveStatus(self.active_status_message) - self.unit.status = ActiveStatus(self.active_status_message) except ( MySQLCreateClusterError, MySQLGetMemberStateError, @@ -676,7 +691,8 @@ def _is_cluster_blocked(self) -> bool: logger.info(f"Unit state is {unit_member_state}") return True - return False + # avoid changing status while async replication is setting up + return not (self.async_replica.idle and self.async_primary.idle) def _on_update_status(self, _: Optional[UpdateStatusEvent]) -> None: """Handle the update status event.""" @@ -758,9 +774,15 @@ def _on_database_storage_detaching(self, _) -> None: except MySQLSetClusterPrimaryError: logger.warning("Failed to switch primary to unit 0") + # If instance is part of a replica cluster, locks are managed by the + # the primary cluster primary (i.e. cluster set global primary) + lock_instance = None + if self._mysql.is_cluster_replica(): + lock_instance = self._mysql.get_cluster_set_global_primary_address() + # The following operation uses locks to ensure that only one instance is removed # from the cluster at a time (to avoid split-brain or lack of majority issues) - self._mysql.remove_instance(self.unit_label) + self._mysql.remove_instance(self.unit_label, lock_instance=lock_instance) # Inform other hooks of current status self.unit_peer_data["unit-status"] = "removing" diff --git a/src/config.py b/src/config.py index 5ddaa7e7f..c4ec5d029 100644 --- a/src/config.py +++ b/src/config.py @@ -47,6 +47,7 @@ class CharmConfig(BaseConfigModel): profile: str cluster_name: Optional[str] + cluster_set_name: Optional[str] profile_limit_memory: Optional[int] mysql_interface_user: Optional[str] mysql_interface_database: Optional[str] @@ -62,23 +63,23 @@ def profile_values(cls, value: str) -> Optional[str]: return value - @validator("cluster_name") + @validator("cluster_name", "cluster_set_name") @classmethod def cluster_name_validator(cls, value: str) -> Optional[str]: - """Check for valid cluster name. + """Check for valid cluster, cluster-set name. Limited to 63 characters, and must start with a letter and contain only alphanumeric characters, `-`, `_` and `.` """ if len(value) > 63: - raise ValueError("Cluster name must be less than 63 characters") + raise ValueError("cluster, cluster-set name must be less than 63 characters") if not value[0].isalpha(): - raise ValueError("Cluster name must start with a letter") + raise ValueError("cluster, cluster-set name must start with a letter") if not re.match(r"^[a-zA-Z0-9-_.]*$", value): raise ValueError( - "Cluster name must contain only alphanumeric characters, " + "cluster, cluster-set name must contain only alphanumeric characters, " "hyphens, underscores and periods" ) diff --git a/src/mysql_k8s_helpers.py b/src/mysql_k8s_helpers.py index 4370eddfd..ca22f9ffd 100644 --- a/src/mysql_k8s_helpers.py +++ b/src/mysql_k8s_helpers.py @@ -13,7 +13,6 @@ Error, MySQLBase, MySQLClientError, - MySQLConfigureMySQLUsersError, MySQLExecError, MySQLGetClusterEndpointsError, MySQLServiceNotRunningError, @@ -256,56 +255,6 @@ def setup_logrotate_config(self) -> None: group=ROOT_SYSTEM_USER, ) - def configure_mysql_users(self) -> None: - """Configure the MySQL users for the instance. - - Creates base `root@%` and `@%` users with the - appropriate privileges, and reconfigure `root@localhost` user password. - - Raises MySQLConfigureMySQLUsersError if the user creation fails. - """ - # SYSTEM_USER and SUPER privileges to revoke from the root users - # Reference: https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_super - privileges_to_revoke = ( - "SYSTEM_USER", - "SYSTEM_VARIABLES_ADMIN", - "SUPER", - "REPLICATION_SLAVE_ADMIN", - "GROUP_REPLICATION_ADMIN", - "BINLOG_ADMIN", - "SET_USER_ID", - "ENCRYPTION_KEY_ADMIN", - "VERSION_TOKEN_ADMIN", - "CONNECTION_ADMIN", - ) - - # Configure root@%, root@localhost and serverconfig@% users - configure_users_commands = ( - f"CREATE USER 'root'@'%' IDENTIFIED BY '{self.root_password}'", - "GRANT ALL ON *.* TO 'root'@'%' WITH GRANT OPTION", - f"CREATE USER '{self.server_config_user}'@'%' IDENTIFIED BY '{self.server_config_password}'", - f"GRANT ALL ON *.* TO '{self.server_config_user}'@'%' WITH GRANT OPTION", - f"CREATE USER '{self.monitoring_user}'@'%' IDENTIFIED BY '{self.monitoring_password}' WITH MAX_USER_CONNECTIONS 3", - f"GRANT SYSTEM_USER, SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO '{self.monitoring_user}'@'%'", - f"CREATE USER '{self.backups_user}'@'%' IDENTIFIED BY '{self.backups_password}'", - f"GRANT CONNECTION_ADMIN, BACKUP_ADMIN, PROCESS, RELOAD, LOCK TABLES, REPLICATION CLIENT ON *.* TO '{self.backups_user}'@'%'", - f"GRANT SELECT ON performance_schema.log_status TO '{self.backups_user}'@'%'", - f"GRANT SELECT ON performance_schema.keyring_component_status TO '{self.backups_user}'@'%'", - f"GRANT SELECT ON performance_schema.replication_group_members TO '{self.backups_user}'@'%'", - "UPDATE mysql.user SET authentication_string=null WHERE User='root' and Host='localhost'", - f"ALTER USER 'root'@'localhost' IDENTIFIED BY '{self.root_password}'", - f"REVOKE {', '.join(privileges_to_revoke)} ON *.* FROM 'root'@'%'", - f"REVOKE {', '.join(privileges_to_revoke)} ON *.* FROM 'root'@'localhost'", - "FLUSH PRIVILEGES", - ) - - try: - logger.debug("Configuring users") - self._run_mysqlcli_script("; ".join(configure_users_commands)) - except MySQLClientError as e: - logger.exception("Error configuring MySQL users", exc_info=e) - raise MySQLConfigureMySQLUsersError(e.message) - def execute_backup_commands( self, s3_directory: str, @@ -591,19 +540,6 @@ def restart_mysql_exporter(self) -> None: """Restarts the mysqld exporter service in pebble.""" self.charm._reconcile_pebble_layer(self.container) - def stop_group_replication(self) -> None: - """Stop Group replication if enabled on the instance.""" - stop_gr_command = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", - "data = session.run_sql('SELECT 1 FROM performance_schema.replication_group_members')", - "if len(data.fetch_all()) > 0:", - " session.run_sql('STOP GROUP_REPLICATION')", - ) - try: - self._run_mysqlsh_script("\n".join(stop_gr_command)) - except ExecError: - logger.debug("Failed to stop Group Replication for unit") - def _execute_commands( self, commands: List[str], @@ -673,8 +609,8 @@ def _run_mysqlsh_script( def _run_mysqlcli_script( self, script: str, - password: Optional[str] = None, user: str = "root", + password: Optional[str] = None, timeout: Optional[int] = None, ) -> str: """Execute a MySQL CLI script. diff --git a/src/relations/mysql.py b/src/relations/mysql.py index 2ed08d182..f8a3a5b70 100644 --- a/src/relations/mysql.py +++ b/src/relations/mysql.py @@ -16,13 +16,7 @@ from ops.framework import Object from ops.model import ActiveStatus, BlockedStatus -from constants import ( - CONTAINER_NAME, - LEGACY_MYSQL, - PASSWORD_LENGTH, - PEER, - ROOT_PASSWORD_KEY, -) +from constants import CONTAINER_NAME, LEGACY_MYSQL, PASSWORD_LENGTH, PEER, ROOT_PASSWORD_KEY from utils import generate_random_password logger = logging.getLogger(__name__) diff --git a/src/relations/mysql_provider.py b/src/relations/mysql_provider.py index ce36cd8dc..398bb8b10 100644 --- a/src/relations/mysql_provider.py +++ b/src/relations/mysql_provider.py @@ -7,10 +7,7 @@ import socket import typing -from charms.data_platform_libs.v0.data_interfaces import ( - DatabaseProvides, - DatabaseRequestedEvent, -) +from charms.data_platform_libs.v0.data_interfaces import DatabaseProvides, DatabaseRequestedEvent from charms.mysql.v0.mysql import ( MySQLCreateApplicationDatabaseAndScopedUserError, MySQLDeleteUserError, @@ -23,13 +20,7 @@ from ops.framework import Object from ops.model import ActiveStatus, BlockedStatus -from constants import ( - CONTAINER_NAME, - CONTAINER_RESTARTS, - DB_RELATION_NAME, - PASSWORD_LENGTH, - PEER, -) +from constants import CONTAINER_NAME, CONTAINER_RESTARTS, DB_RELATION_NAME, PASSWORD_LENGTH, PEER from k8s_helpers import KubernetesClientError from utils import generate_random_password @@ -243,7 +234,7 @@ def _on_database_broken(self, event: RelationBrokenEvent) -> None: # run once by the leader return - if self.charm.unit_peer_data.get("unit-status", None) == "removing": + if self.charm.removing_unit: # safeguard against relation broken being triggered for # a unit being torn down (instead of un-related). See: # https://bugs.launchpad.net/juju/+bug/1979811 diff --git a/src/relations/mysql_root.py b/src/relations/mysql_root.py index d18c276d2..0d7021fac 100644 --- a/src/relations/mysql_root.py +++ b/src/relations/mysql_root.py @@ -7,10 +7,7 @@ import logging import typing -from charms.mysql.v0.mysql import ( - MySQLCheckUserExistenceError, - MySQLDeleteUsersForUnitError, -) +from charms.mysql.v0.mysql import MySQLCheckUserExistenceError, MySQLDeleteUsersForUnitError from ops.charm import RelationBrokenEvent, RelationCreatedEvent from ops.framework import Object from ops.model import ActiveStatus, BlockedStatus @@ -188,8 +185,18 @@ def _on_mysql_root_relation_created(self, event: RelationCreatedEvent) -> None: password = self._get_or_set_password_in_peer_secrets(username) try: + root_password = self.charm.get_secret("app", ROOT_PASSWORD_KEY) + assert root_password, "Root password not set" self.charm._mysql.create_database(database) self.charm._mysql.create_user(username, password, "mysql-root-legacy-relation") + if not self.charm._mysql.does_mysql_user_exist("root", "%"): + # create `root@%` user if it doesn't exist + # this is needed for the `mysql-root` interface to work + self.charm._mysql.create_user( + "root", + root_password, + "mysql-root-legacy-relation", + ) self.charm._mysql.escalate_user_privileges("root") self.charm._mysql.escalate_user_privileges(username) except (MySQLCreateDatabaseError, MySQLCreateUserError, MySQLEscalateUserPrivilegesError): @@ -207,7 +214,7 @@ def _on_mysql_root_relation_created(self, event: RelationCreatedEvent) -> None: "host": primary_address.split(":")[0], "password": password, "port": "3306", - "root_password": self.charm.get_secret("app", ROOT_PASSWORD_KEY), + "root_password": root_password, "user": username, } diff --git a/src/upgrade.py b/src/upgrade.py index 835430cfc..a7b90c34b 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -191,12 +191,8 @@ def _on_stop(self, _) -> None: If upgrade is in progress, set unit status. """ - try: - if self.charm.unit_peer_data["unit-status"] == "removing": - # unit is being removed, noop - return - except KeyError: - # databag gone + if self.charm.removing_unit: + # unit is being removed, noop return if self.upgrade_stack: # upgrade stack set, pre-upgrade-check ran @@ -207,7 +203,7 @@ def _on_upgrade_changed(self, _) -> None: Run update status for every unit when the upgrade is completed. """ - if not self.upgrade_stack and self.idle: + if not self.upgrade_stack and self.idle and self.charm.unit_initialized: self.charm._on_update_status(None) def _on_pebble_ready(self, event) -> None: diff --git a/src/utils.py b/src/utils.py index e0f2aa7c5..1ceb5269e 100644 --- a/src/utils.py +++ b/src/utils.py @@ -3,7 +3,6 @@ """A collection of utility functions that are used in the charm.""" -import hashlib import re import secrets import string @@ -21,16 +20,6 @@ def generate_random_password(length: int) -> str: return "".join([secrets.choice(choices) for _ in range(length)]) -def generate_random_hash() -> str: - """Generate a hash based on a random string. - - Returns: - A hash based on a random string. - """ - random_characters = generate_random_password(10) - return hashlib.md5(random_characters.encode("utf-8")).hexdigest() - - def split_mem(mem_str) -> tuple: """Split a memory string into a number and a unit. diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 53faf62b1..4f6f6569b 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -11,6 +11,7 @@ import mysql.connector import yaml +from juju.model import Model from juju.unit import Unit from mysql.connector.errors import ( DatabaseError, @@ -40,37 +41,51 @@ def generate_random_string(length: int) -> str: return "".join([secrets.choice(choices) for i in range(length)]) -async def get_unit_address(ops_test: OpsTest, unit_name: str) -> str: +async def get_unit_address( + ops_test: OpsTest, unit_name: str, model: Optional[Model] = None +) -> str: """Get unit IP address. Args: ops_test: The ops test framework instance unit_name: The name of the unit + model: (Optional) model to use instead of ops_test.model Returns: IP address of the unit """ - status = await ops_test.model.get_status() + if model is None: + model = ops_test.model + status = await model.get_status() return status["applications"][unit_name.split("/")[0]].units[unit_name]["address"] -async def get_cluster_status(ops_test: OpsTest, unit: Unit) -> Dict: +async def get_cluster_status(unit: Unit, cluster_set=False) -> Dict: """Get the cluster status by running the get-cluster-status action. Args: - ops_test: The ops test framework unit: The unit on which to execute the action on + cluster_set: A boolean indicating whether to get the cluster set status Returns: A dictionary representing the cluster status """ - results = await juju_.run_action(unit, "get-cluster-status") + if cluster_set: + results = await juju_.run_action( + unit, "get-cluster-status", **{"--wait": "5m", "cluster-set": True} + ) + else: + results = await juju_.run_action(unit, "get-cluster-status") return results.get("status", {}) -async def get_leader_unit(ops_test: OpsTest, app_name: str) -> Optional[Unit]: +async def get_leader_unit( + ops_test: OpsTest, app_name: str, model: Optional[Model] = None +) -> Optional[Unit]: leader_unit = None - for unit in ops_test.model.applications[app_name].units: + if model is None: + model = ops_test.model + for unit in model.applications[app_name].units: if await unit.is_leader_from_status(): leader_unit = unit break @@ -127,7 +142,7 @@ async def get_primary_unit( Returns: A juju unit that is a MySQL primary """ - cluster_status = await get_cluster_status(ops_test, unit) + cluster_status = await get_cluster_status(unit) primary_label = [ label diff --git a/tests/integration/high_availability/conftest.py b/tests/integration/high_availability/conftest.py index fb4d5f3a6..5c161e85a 100644 --- a/tests/integration/high_availability/conftest.py +++ b/tests/integration/high_availability/conftest.py @@ -23,11 +23,14 @@ async def continuous_writes(ops_test: OpsTest) -> None: """Starts continuous writes to the MySQL cluster for a test and clear the writes at the end.""" application_unit = ops_test.model.applications[APPLICATION_DEFAULT_APP_NAME].units[0] + logger.info("Clearing continuous writes") await juju_.run_action(application_unit, "clear-continuous-writes") + logger.info("Starting continuous writes") await juju_.run_action(application_unit, "start-continuous-writes") yield + logger.info("Clearing continuous writes") await juju_.run_action(application_unit, "clear-continuous-writes") diff --git a/tests/integration/high_availability/high_availability_helpers.py b/tests/integration/high_availability/high_availability_helpers.py index b1e07c13e..97b2502f8 100644 --- a/tests/integration/high_availability/high_availability_helpers.py +++ b/tests/integration/high_availability/high_availability_helpers.py @@ -11,18 +11,12 @@ import kubernetes import yaml +from juju.model import Model from juju.unit import Unit from lightkube import Client from lightkube.resources.apps_v1 import StatefulSet from pytest_operator.plugin import OpsTest -from tenacity import ( - RetryError, - Retrying, - retry, - stop_after_attempt, - stop_after_delay, - wait_fixed, -) +from tenacity import RetryError, Retrying, retry, stop_after_attempt, stop_after_delay, wait_fixed from ..helpers import ( execute_queries_on_unit, @@ -72,7 +66,7 @@ async def get_max_written_value_in_database(ops_test: OpsTest, unit: Unit) -> in return output[0] -def get_application_name(ops_test: OpsTest, application_name_substring: str) -> str: +def get_application_name(ops_test: OpsTest, application_name_substring: str) -> Optional[str]: """Returns the name of the application witt the provided application name. This enables us to retrieve the name of the deployed application in an existing model. @@ -106,7 +100,7 @@ async def ensure_n_online_mysql_members( try: for attempt in Retrying(stop=stop_after_delay(5 * 60), wait=wait_fixed(10)): with attempt: - cluster_status = await get_cluster_status(ops_test, mysql_unit) + cluster_status = await get_cluster_status(mysql_unit) online_members = [ label for label, member in cluster_status["defaultreplicaset"]["topology"].items() @@ -123,6 +117,7 @@ async def deploy_and_scale_mysql( check_for_existing_application: bool = True, mysql_application_name: str = MYSQL_DEFAULT_APP_NAME, num_units: int = 3, + model: Optional[Model] = None, ) -> str: """Deploys and scales the mysql application charm. @@ -132,11 +127,14 @@ async def deploy_and_scale_mysql( in the model mysql_application_name: The name of the mysql application if it is to be deployed num_units: The number of units to deploy + model: The model to deploy the mysql application to """ application_name = get_application_name(ops_test, "mysql") + if not model: + model = ops_test.model if check_for_existing_application and application_name: - if len(ops_test.model.applications[application_name].units) != num_units: + if len(model.applications[application_name].units) != num_units: async with ops_test.fast_forward("60s"): await scale_application(ops_test, application_name, num_units) @@ -302,12 +300,12 @@ async def high_availability_test_setup(ops_test: OpsTest) -> Tuple[str, str]: async def send_signal_to_pod_container_process( - ops_test: OpsTest, unit_name: str, container_name: str, process: str, signal_code: str + model_name: str, unit_name: str, container_name: str, process: str, signal_code: str ) -> None: """Send the specified signal to a pod container process. Args: - ops_test: The ops test framework + model_name: The juju model name unit_name: The name of the unit to send signal to container_name: The name of the container to send signal to process: The name of the process to send signal to @@ -321,7 +319,7 @@ async def send_signal_to_pod_container_process( response = kubernetes.stream.stream( kubernetes.client.api.core_v1_api.CoreV1Api().connect_get_namespaced_pod_exec, pod_name, - ops_test.model.info.name, + model_name, container=container_name, command=send_signal_command.split(), stdin=False, @@ -459,20 +457,25 @@ async def clean_up_database_and_table( async def ensure_all_units_continuous_writes_incrementing( - ops_test: OpsTest, mysql_units: Optional[List[Unit]] = None + ops_test: OpsTest, + mysql_units: Optional[List[Unit]] = None, + mysql_application_name: Optional[str] = None, ) -> None: """Ensure that continuous writes is incrementing on all units. Also, ensure that all continuous writes up to the max written value is available on all units (ensure that no committed data is lost). """ - mysql_application_name = get_application_name(ops_test, "mysql") + if not mysql_application_name: + mysql_application_name = get_application_name(ops_test, "mysql") if not mysql_units: mysql_units = ops_test.model.applications[mysql_application_name].units primary = await get_primary_unit(ops_test, mysql_units[0], mysql_application_name) + assert primary, "Primary unit not found" + last_max_written_value = await get_max_written_value_in_database(ops_test, primary) select_all_continuous_writes_sql = [f"SELECT * FROM `{DATABASE_NAME}`.`{TABLE_NAME}`"] @@ -551,7 +554,7 @@ async def wait_until_units_in_status( ops_test: OpsTest, units_to_check: List[Unit], online_unit: Unit, status: str ) -> None: """Waits until all units specified are in a given status, or timeout occurs.""" - cluster_status = await get_cluster_status(ops_test, online_unit) + cluster_status = await get_cluster_status(online_unit) for unit in units_to_check: assert ( diff --git a/tests/integration/high_availability/test_async_replication.py b/tests/integration/high_availability/test_async_replication.py new file mode 100644 index 000000000..d666f897e --- /dev/null +++ b/tests/integration/high_availability/test_async_replication.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + + +import logging +from asyncio import gather +from pathlib import Path +from time import sleep +from typing import Optional + +import pytest +import yaml +from juju.model import Model +from pytest_operator.plugin import OpsTest + +from .. import juju_ +from ..helpers import ( + execute_queries_on_unit, + get_cluster_status, + get_leader_unit, + get_relation_data, + get_unit_address, +) +from ..markers import juju3 +from .high_availability_helpers import ( + DATABASE_NAME, + TABLE_NAME, + send_signal_to_pod_container_process, +) + +logger = logging.getLogger(__name__) +MYSQL_APP1 = "db1" +MYSQL_APP2 = "db2" +MYSQL_ROUTER_APP_NAME = "mysql-router-k8s" +APPLICATION_APP_NAME = "mysql-test-app" + +MYSQL_CONTAINER_NAME = "mysql" +MYSQLD_PROCESS_NAME = "mysqld" + +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +MINUTE = 60 + + +@pytest.fixture(scope="module") +def first_model(ops_test: OpsTest) -> Optional[Model]: + """Return the first model.""" + first_model = ops_test.model + return first_model + + +@pytest.fixture(scope="module") +async def second_model( + ops_test: OpsTest, first_model, request +) -> Model: # pyright: ignore [reportInvalidTypeForm] + """Create and return the second model.""" + second_model_name = f"{first_model.info.name}-other" + await ops_test._controller.add_model(second_model_name) + second_model = Model() + await second_model.connect(model_name=second_model_name) + yield second_model # pyright: ignore [reportReturnType] + + if request.config.getoption("--keep-models"): + return + logger.info("Destroying second model") + await ops_test._controller.destroy_model(second_model_name, destroy_storage=True) + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_build_and_deploy( + ops_test: OpsTest, first_model: Model, second_model: Model +) -> None: + """Simple test to ensure that the mysql and application charms get deployed.""" + logger.info("Build mysql charm") + charm = await ops_test.build_charm(".") + + config = {"cluster-name": "lima", "profile": "testing"} + resources = {"mysql-image": METADATA["resources"]["mysql-image"]["upstream-source"]} + + logger.info("Deploying mysql clusters") + await first_model.deploy( + charm, + application_name=MYSQL_APP1, + num_units=3, + config=config, + resources=resources, + trust=True, + ) + config["cluster-name"] = "cuzco" + await second_model.deploy( + charm, + application_name=MYSQL_APP2, + num_units=3, + config=config, + resources=resources, + trust=True, + ) + + logger.info("Waiting for the applications to settle") + await gather( + first_model.wait_for_idle( + apps=[MYSQL_APP1], + status="active", + timeout=10 * MINUTE, + ), + second_model.wait_for_idle( + apps=[MYSQL_APP2], + status="active", + timeout=10 * MINUTE, + ), + ) + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_async_relate(first_model: Model, second_model: Model) -> None: + """Relate the two mysql clusters.""" + logger.info("Creating offers in first model") + await first_model.create_offer(f"{MYSQL_APP1}:async-primary") + + logger.info("Consume offer in second model") + await second_model.consume(endpoint=f"admin/{first_model.info.name}.{MYSQL_APP1}") + + logger.info("Relating the two mysql clusters") + await second_model.integrate(f"{MYSQL_APP1}", f"{MYSQL_APP2}:async-replica") + + logger.info("Waiting for the applications to settle") + await gather( + first_model.wait_for_idle( + apps=[MYSQL_APP1], + status="active", + timeout=10 * MINUTE, + ), + second_model.wait_for_idle( + apps=[MYSQL_APP2], + status="active", + timeout=10 * MINUTE, + ), + ) + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_deploy_router_and_app(first_model: Model) -> None: + """Deploy the router and the test application.""" + logger.info("Deploying router and application") + await first_model.deploy( + MYSQL_ROUTER_APP_NAME, + application_name=MYSQL_ROUTER_APP_NAME, + series="jammy", + channel="8.0/stable", + num_units=1, + trust=True, + ) + await first_model.deploy( + APPLICATION_APP_NAME, + application_name=APPLICATION_APP_NAME, + series="jammy", + channel="latest/stable", + num_units=1, + ) + + logger.info("Relate app and router") + await first_model.integrate( + APPLICATION_APP_NAME, + MYSQL_ROUTER_APP_NAME, + ) + logger.info("Relate router and db") + await first_model.integrate(MYSQL_ROUTER_APP_NAME, MYSQL_APP1) + + await first_model.wait_for_idle( + apps=[MYSQL_ROUTER_APP_NAME, APPLICATION_APP_NAME], + timeout=10 * MINUTE, + raise_on_error=False, + ) + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_data_replication( + first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test to write to primary, and read the same data back from replicas.""" + results = await get_max_written_value(first_model, second_model) + assert len(results) == 6, f"Expected 6 results, got {len(results)}" + assert all(x == results[0] for x in results), "Data is not consistent across units" + assert results[0] > 1, "No data was written to the database" + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_standby_promotion( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test graceful promotion of a standby cluster to primary.""" + leader_unit = await get_leader_unit(None, MYSQL_APP2, second_model) + + assert leader_unit is not None, "No leader unit found on standby cluster" + + relation_data = await get_relation_data(ops_test, MYSQL_APP1, "database-peers") + cluster_set_name = relation_data[0]["application-data"]["cluster-set-domain-name"] + logger.info("Promoting standby cluster to primary") + await juju_.run_action( + leader_unit, "promote-standby-cluster", **{"cluster-set-name": cluster_set_name} + ) + + results = await get_max_written_value(first_model, second_model) + assert len(results) == 6, f"Expected 6 results, got {len(results)}" + assert all(x == results[0] for x in results), "Data is not consistent across units" + assert results[0] > 1, "No data was written to the database" + + cluster_set_status = await get_cluster_status(leader_unit, cluster_set=True) + assert ( + cluster_set_status["clusters"]["cuzco"]["clusterrole"] == "primary" + ), "standby not promoted to primary" + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_failover(ops_test: OpsTest, first_model: Model, second_model: Model) -> None: + """Test switchover on primary cluster fail.""" + logger.info("Freezing mysqld on primary cluster units") + second_model_units = second_model.applications[MYSQL_APP2].units + + # simulating a failure on the primary cluster + for unit in second_model_units: + await send_signal_to_pod_container_process( + second_model.info.name, unit.name, MYSQL_CONTAINER_NAME, MYSQLD_PROCESS_NAME, "SIGSTOP" + ) + + logger.info("Promoting standby cluster to primary with force flag") + leader_unit = await get_leader_unit(None, MYSQL_APP1, first_model) + assert leader_unit is not None, "No leader unit found" + relation_data = await get_relation_data(ops_test, MYSQL_APP1, "database-peers") + cluster_set_name = relation_data[0]["application-data"]["cluster-set-domain-name"] + await juju_.run_action( + leader_unit, + "promote-standby-cluster", + **{"--wait": "5m", "cluster-set-name": cluster_set_name, "force": True}, + ) + + cluster_set_status = await get_cluster_status(leader_unit, cluster_set=True) + assert ( + cluster_set_status["clusters"]["lima"]["clusterrole"] == "primary" + ), "standby not promoted to primary" + assert ( + cluster_set_status["clusters"]["cuzco"]["globalstatus"] == "invalidated" + ), "old primary not invalidated" + + # restore mysqld process + for unit in second_model_units: + await send_signal_to_pod_container_process( + second_model.info.name, unit.name, MYSQL_CONTAINER_NAME, MYSQLD_PROCESS_NAME, "SIGCONT" + ) + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_rejoin_invalidated_cluster( + first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test rejoin invalidated cluster with.""" + leader_unit = await get_leader_unit(None, MYSQL_APP1, first_model) + assert leader_unit is not None, "No leader unit found" + await juju_.run_action( + leader_unit, + "rejoin-cluster", + **{"--wait": "5m", "cluster-name": "cuzco"}, + ) + results = await get_max_written_value(first_model, second_model) + assert len(results) == 6, f"Expected 6 results, got {len(results)}" + assert all(x == results[0] for x in results), "Data is not consistent across units" + assert results[0] > 1, "No data was written to the database" + + +@juju3 +@pytest.mark.abort_on_fail +@pytest.mark.group(1) +async def test_remove_relation_and_relate( + first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test removing and re-relating the two mysql clusters.""" + logger.info("Stopping continuous writes after 5s") + # part 1/2 of workaround for https://github.com/canonical/mysql-k8s-operator/issues/399 + # sleep is need to ensure there is enough time for the `continuous_writes` database be + # created/populated (by the fixture) before stopping the continuous writes + sleep(5) + application_unit = first_model.applications[APPLICATION_APP_NAME].units[0] + await juju_.run_action(application_unit, "stop-continuous-writes") + + logger.info("Remove async relation") + await second_model.applications[MYSQL_APP2].remove_relation( + f"{MYSQL_APP2}:async-replica", MYSQL_APP1 + ) + + second_model_units = second_model.applications[MYSQL_APP2].units + logger.info("waiting for units to be blocked") + await second_model.block_until( + lambda: all(unit.workload_status == "blocked" for unit in second_model_units), + timeout=10 * MINUTE, + ) + + logger.info("Waiting for the applications to settle") + await gather( + first_model.wait_for_idle( + apps=[MYSQL_APP1], + status="active", + timeout=10 * MINUTE, + ), + second_model.wait_for_idle( + apps=[MYSQL_APP2], + status="blocked", + timeout=10 * MINUTE, + ), + ) + + logger.info("Re relating the two mysql clusters") + await second_model.integrate(f"{MYSQL_APP1}", f"{MYSQL_APP2}:async-replica") + + logger.info("Waiting for the applications to settle") + await gather( + first_model.wait_for_idle( + apps=[MYSQL_APP1], + status="active", + timeout=10 * MINUTE, + ), + second_model.wait_for_idle( + apps=[MYSQL_APP2], + status="active", + timeout=10 * MINUTE, + ), + ) + + # part 2/2 of workaround for https://github.com/canonical/mysql-k8s-operator/issues/399 + await juju_.run_action(application_unit, "start-continuous-writes") + + results = await get_max_written_value(first_model, second_model) + assert len(results) == 6, f"Expected 6 results, got {len(results)}" + assert all(x == results[0] for x in results), "Data is not consistent across units" + assert results[0] > 1, "No data was written to the database" + + +async def get_max_written_value(first_model: Model, second_model: Model) -> list[int]: + """Return list with max written value from all units.""" + select_max_written_value_sql = [f"SELECT MAX(number) FROM `{DATABASE_NAME}`.`{TABLE_NAME}`;"] + logger.info("Testing data replication") + first_model_units = first_model.applications[MYSQL_APP1].units + second_model_units = second_model.applications[MYSQL_APP2].units + credentials = await juju_.run_action( + first_model_units[0], "get-password", username="serverconfig" + ) + + logger.info("Stopping continuous writes and wait (5s) for replication") + application_unit = first_model.applications[APPLICATION_APP_NAME].units[0] + await juju_.run_action(application_unit, "stop-continuous-writes") + + sleep(5) + results = list() + + logger.info("Querying max value on all units") + for unit in first_model_units + second_model_units: + address = await get_unit_address(None, unit.name, unit.model) + values = await execute_queries_on_unit( + address, credentials["username"], credentials["password"], select_max_written_value_sql + ) + results.append(values[0]) + + return results diff --git a/tests/integration/high_availability/test_self_healing.py b/tests/integration/high_availability/test_self_healing.py index d1fec49ea..2e87f187c 100644 --- a/tests/integration/high_availability/test_self_healing.py +++ b/tests/integration/high_availability/test_self_healing.py @@ -70,7 +70,7 @@ async def test_kill_db_process(ops_test: OpsTest, continuous_writes) -> None: logger.info(f"Sending SIGKILL to unit {primary.name}") await send_signal_to_pod_container_process( - ops_test, + ops_test.model.info.name, primary.name, MYSQL_CONTAINER_NAME, MYSQLD_PROCESS_NAME, @@ -134,7 +134,7 @@ async def test_freeze_db_process(ops_test: OpsTest, continuous_writes) -> None: logger.info(f"Sending SIGSTOP to unit {primary.name}") await send_signal_to_pod_container_process( - ops_test, + ops_test.model.info.name, primary.name, MYSQL_CONTAINER_NAME, MYSQLD_PROCESS_NAME, @@ -180,7 +180,7 @@ async def test_freeze_db_process(ops_test: OpsTest, continuous_writes) -> None: logger.info(f"Sending SIGCONT to {primary.name}") await send_signal_to_pod_container_process( - ops_test, + ops_test.model.info.name, primary.name, MYSQL_CONTAINER_NAME, MYSQLD_PROCESS_NAME, @@ -253,7 +253,7 @@ async def test_graceful_crash_of_primary(ops_test: OpsTest, continuous_writes) - logger.info(f"Sending SIGTERM to unit {primary.name}") await send_signal_to_pod_container_process( - ops_test, + ops_test.model.info.name, primary.name, MYSQL_CONTAINER_NAME, MYSQLD_PROCESS_NAME, @@ -325,7 +325,7 @@ async def test_network_cut_affecting_an_instance( await wait_until_units_in_status(ops_test, [primary], remaining_units[0], "(missing)") await wait_until_units_in_status(ops_test, remaining_units, remaining_units[0], "online") - cluster_status = await get_cluster_status(ops_test, remaining_units[0]) + cluster_status = await get_cluster_status(remaining_units[0]) isolated_primary_status, isolated_primary_memberrole = [ (member["status"], member["memberrole"]) @@ -357,7 +357,7 @@ async def test_network_cut_affecting_an_instance( logger.info("Wait until all units are online") await wait_until_units_in_status(ops_test, mysql_units, mysql_units[0], "online") - new_cluster_status = await get_cluster_status(ops_test, mysql_units[0]) + new_cluster_status = await get_cluster_status(mysql_units[0]) logger.info("Ensure isolated instance is now secondary") isolated_primary_status, isolated_primary_memberrole = [ @@ -439,7 +439,7 @@ async def test_graceful_full_cluster_crash_test(ops_test: OpsTest, continuous_wr ) assert new_pid > unit_mysqld_pids[unit.name], "The mysqld process did not restart" - cluster_status = await get_cluster_status(ops_test, mysql_units[0]) + cluster_status = await get_cluster_status(mysql_units[0]) for member in cluster_status["defaultreplicaset"]["topology"].values(): assert member["status"] == "online" diff --git a/tests/integration/high_availability/test_upgrade.py b/tests/integration/high_availability/test_upgrade.py index e90525999..95dd6c636 100644 --- a/tests/integration/high_availability/test_upgrade.py +++ b/tests/integration/high_availability/test_upgrade.py @@ -1,7 +1,6 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. -import asyncio import json import logging import shutil @@ -33,29 +32,28 @@ TIMEOUT = 15 * 60 MYSQL_APP_NAME = "mysql-k8s" -TEST_APP_NAME = "test-app" +TEST_APP_NAME = "mysql-test-app" @pytest.mark.group(1) @pytest.mark.abort_on_fail async def test_deploy_latest(ops_test: OpsTest) -> None: """Simple test to ensure that the mysql and application charms get deployed.""" - await asyncio.gather( - ops_test.model.deploy( - MYSQL_APP_NAME, - application_name=MYSQL_APP_NAME, - num_units=3, - channel="8.0/edge", - trust=True, - config={"profile": "testing"}, - ), - ops_test.model.deploy( - f"mysql-{TEST_APP_NAME}", - application_name=TEST_APP_NAME, - num_units=1, - channel="latest/edge", - ), + await ops_test.model.deploy( + MYSQL_APP_NAME, + application_name=MYSQL_APP_NAME, + num_units=3, + channel="8.0/edge", + trust=True, + config={"profile": "testing"}, ) + await ops_test.model.deploy( + TEST_APP_NAME, + application_name=TEST_APP_NAME, + num_units=1, + channel="latest/edge", + ) + await relate_mysql_and_application(ops_test, MYSQL_APP_NAME, TEST_APP_NAME) logger.info("Wait for applications to become active") await ops_test.model.wait_for_idle( diff --git a/tests/integration/juju_.py b/tests/integration/juju_.py index ced8956a0..b140e3e71 100644 --- a/tests/integration/juju_.py +++ b/tests/integration/juju_.py @@ -11,12 +11,14 @@ _libjuju_version = importlib.metadata.version("juju") has_secrets = ops.JujuVersion(_libjuju_version).has_secrets +juju_major_version = int(_libjuju_version.split(".")[0]) + async def run_action(unit: juju.unit.Unit, action_name, **params): action = await unit.run_action(action_name=action_name, **params) result = await action.wait() # Syntax changed across libjuju major versions - if int(_libjuju_version.split(".")[0]) <= 2: + if juju_major_version <= 2: assert result.results.get("Code") == "0" else: assert result.results.get("return-code") == 0 diff --git a/tests/integration/markers.py b/tests/integration/markers.py index efa6d5064..30bc03e50 100644 --- a/tests/integration/markers.py +++ b/tests/integration/markers.py @@ -11,3 +11,4 @@ only_without_juju_secrets = pytest.mark.skipif( juju_.has_secrets, reason="Requires juju version w/o secrets" ) +juju3 = pytest.mark.skipif(juju_.juju_major_version < 3, reason="Requires juju 3+") diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index b8c8399ff..f47cd00c6 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -153,7 +153,7 @@ async def test_scale_up_and_down(ops_test: OpsTest) -> None: await scale_application(ops_test, APP_NAME, 5) - cluster_status = await get_cluster_status(ops_test, random_unit) + cluster_status = await get_cluster_status(random_unit) online_member_addresses = [ member["address"] for _, member in cluster_status["defaultreplicaset"]["topology"].items() @@ -173,7 +173,7 @@ async def test_scale_up_and_down(ops_test: OpsTest) -> None: ) random_unit = ops_test.model.applications[APP_NAME].units[0] - cluster_status = await get_cluster_status(ops_test, random_unit) + cluster_status = await get_cluster_status(random_unit) online_member_addresses = [ member["address"] for _, member in cluster_status["defaultreplicaset"]["topology"].items() @@ -198,7 +198,7 @@ async def test_scale_up_after_scale_down(ops_test: OpsTest) -> None: await scale_application(ops_test, APP_NAME, 3) - cluster_status = await get_cluster_status(ops_test, random_unit) + cluster_status = await get_cluster_status(random_unit) online_member_addresses = [ member["address"] for _, member in cluster_status["defaultreplicaset"]["topology"].items() diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index ff75d67ad..ffaff2497 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -113,6 +113,8 @@ def test_on_leader_elected_secrets(self): secret_data[password].isalnum() and len(secret_data[password]) == PASSWORD_LENGTH ) + @patch("mysql_k8s_helpers.MySQL.rescan_cluster") + @patch("charms.mysql.v0.mysql.MySQLCharmBase.active_status_message", return_value="") @patch("upgrade.MySQLK8sUpgrade.idle", return_value=True) @patch("mysql_k8s_helpers.MySQL.write_content_to_file") @patch("mysql_k8s_helpers.MySQL.is_data_dir_initialised", return_value=False) @@ -151,7 +153,9 @@ def test_mysql_pebble_ready( _is_data_dir_initialised, _create_cluster_set, _write_content_to_file, + _active_status_message, _upgrade_idle, + _rescan_cluster, ): # Check if initial plan is empty self.harness.set_can_connect("mysql", True) @@ -169,12 +173,16 @@ def test_mysql_pebble_ready( # After configuration run, plan should be populated plan = self.harness.get_container_pebble_plan("mysql") - self.assertEqual(plan.to_dict()["services"], self.layer_dict()["services"]) + self.assertEqual( + plan.to_dict()["services"], # pyright: ignore[reportTypedDictNotRequiredAccess] + self.layer_dict()["services"], + ) self.harness.add_relation("metrics-endpoint", "test-cos-app") plan = self.harness.get_container_pebble_plan("mysql") self.assertEqual( - plan.to_dict()["services"], self.layer_dict(with_mysqld_exporter=True)["services"] + plan.to_dict()["services"], # pyright: ignore[reportTypedDictNotRequiredAccess] + self.layer_dict(with_mysqld_exporter=True)["services"], ) @patch("charm.MySQLOperatorCharm.join_unit_to_cluster") @@ -258,11 +266,59 @@ def test_mysql_property(self, _): mysql = self.charm._mysql self.assertTrue(isinstance(mysql, MySQL)) + @patch("charm.MySQLOperatorCharm._on_leader_elected") + def test_get_secret(self, _): + self.harness.set_leader() + + # Test application scope. + assert self.charm.get_secret("app", "password") is None + self.harness.update_relation_data( + self.peer_relation_id, self.charm.app.name, {"password": "test-password"} + ) + assert self.charm.get_secret("app", "password") == "test-password" + + # Test unit scope. + assert self.charm.get_secret("unit", "password") is None + self.harness.update_relation_data( + self.peer_relation_id, self.charm.unit.name, {"password": "test-password"} + ) + assert self.charm.get_secret("unit", "password") == "test-password" + + @pytest.mark.usefixtures("without_juju_secrets") + @patch("charm.MySQLOperatorCharm._on_leader_elected") + def test_set_secret_databag(self, _): + self.harness.set_leader() + + # Test application scope. + assert "password" not in self.harness.get_relation_data( + self.peer_relation_id, self.charm.app.name + ) + self.charm.set_secret("app", "password", "test-password") + assert ( + self.harness.get_relation_data(self.peer_relation_id, self.charm.app.name)["password"] + == "test-password" + ) + + # Test unit scope. + assert "password" not in self.harness.get_relation_data( + self.peer_relation_id, self.charm.unit.name + ) + self.charm.set_secret("unit", "password", "test-password") + assert ( + self.harness.get_relation_data(self.peer_relation_id, self.charm.unit.name)["password"] + == "test-password" + ) + + @patch("charms.mysql.v0.mysql.MySQLBase.is_cluster_replica", return_value=False) @patch("mysql_k8s_helpers.MySQL.remove_instance") @patch("mysql_k8s_helpers.MySQL.get_primary_label") @patch("mysql_k8s_helpers.MySQL.is_instance_in_cluster", return_value=True) def test_database_storage_detaching( - self, mock_is_instance_in_cluster, mock_get_primary_label, mock_remove_instance + self, + mock_is_instance_in_cluster, + mock_get_primary_label, + mock_remove_instance, + mock_is_cluster_replica, ): self.harness.update_relation_data( self.peer_relation_id, self.charm.unit.name, {"unit-initialized": "True"} @@ -275,7 +331,7 @@ def test_database_storage_detaching( mock_get_primary_label.return_value = self.charm.unit_label self.charm._on_database_storage_detaching(None) - mock_remove_instance.assert_called_once_with(self.charm.unit_label) + mock_remove_instance.assert_called_once_with(self.charm.unit_label, lock_instance=None) self.assertEqual( self.harness.get_relation_data(self.peer_relation_id, self.charm.unit.name)[ diff --git a/tests/unit/test_mysql_k8s_helpers.py b/tests/unit/test_mysql_k8s_helpers.py index 92b08ae2c..204a0dd7b 100644 --- a/tests/unit/test_mysql_k8s_helpers.py +++ b/tests/unit/test_mysql_k8s_helpers.py @@ -6,7 +6,7 @@ from unittest.mock import MagicMock, call, patch import tenacity -from charms.mysql.v0.mysql import MySQLClientError, MySQLConfigureMySQLUsersError +from charms.mysql.v0.mysql import MySQLClientError from ops.pebble import ExecError from mysql_k8s_helpers import ( @@ -102,55 +102,6 @@ def test_wait_until_mysql_connection(self, _container): self.assertTrue(not self.mysql.wait_until_mysql_connection(check_port=False)) - @patch("mysql_k8s_helpers.MySQL._run_mysqlcli_script") - def test_configure_mysql_users(self, _run_mysqlcli_script): - """Test failed to configuring the MySQL users.""" - privileges_to_revoke = ( - "SYSTEM_USER", - "SYSTEM_VARIABLES_ADMIN", - "SUPER", - "REPLICATION_SLAVE_ADMIN", - "GROUP_REPLICATION_ADMIN", - "BINLOG_ADMIN", - "SET_USER_ID", - "ENCRYPTION_KEY_ADMIN", - "VERSION_TOKEN_ADMIN", - "CONNECTION_ADMIN", - ) - - _expected_configure_user_commands = "; ".join( - ( - "CREATE USER 'root'@'%' IDENTIFIED BY 'password'", - "GRANT ALL ON *.* TO 'root'@'%' WITH GRANT OPTION", - "CREATE USER 'serverconfig'@'%' IDENTIFIED BY 'serverconfigpassword'", - "GRANT ALL ON *.* TO 'serverconfig'@'%' WITH GRANT OPTION", - "CREATE USER 'monitoring'@'%' IDENTIFIED BY 'monitoringpassword' WITH MAX_USER_CONNECTIONS 3", - "GRANT SYSTEM_USER, SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO 'monitoring'@'%'", - "CREATE USER 'backups'@'%' IDENTIFIED BY 'backupspassword'", - "GRANT CONNECTION_ADMIN, BACKUP_ADMIN, PROCESS, RELOAD, LOCK TABLES, REPLICATION CLIENT ON *.* TO 'backups'@'%'", - "GRANT SELECT ON performance_schema.log_status TO 'backups'@'%'", - "GRANT SELECT ON performance_schema.keyring_component_status TO 'backups'@'%'", - "GRANT SELECT ON performance_schema.replication_group_members TO 'backups'@'%'", - "UPDATE mysql.user SET authentication_string=null WHERE User='root' and Host='localhost'", - "ALTER USER 'root'@'localhost' IDENTIFIED BY 'password'", - f"REVOKE {', '.join(privileges_to_revoke)} ON *.* FROM 'root'@'%'", - f"REVOKE {', '.join(privileges_to_revoke)} ON *.* FROM 'root'@'localhost'", - "FLUSH PRIVILEGES", - ) - ) - - self.mysql.configure_mysql_users() - - _run_mysqlcli_script.assert_called_once_with(_expected_configure_user_commands) - - @patch("mysql_k8s_helpers.MySQL._run_mysqlcli_script") - def test_configure_mysql_users_exception(self, _run_mysqlcli_script): - """Test exceptions trying to configuring the MySQL users.""" - _run_mysqlcli_script.side_effect = MySQLClientError("Error running mysql") - - with self.assertRaises(MySQLConfigureMySQLUsersError): - self.mysql.configure_mysql_users() - @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") def test_create_database(self, _run_mysqlsh_script): """Test successful execution of create_database.""" diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index 5e81b636d..f806c4390 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -5,10 +5,7 @@ import unittest from unittest.mock import call, patch -from charms.data_platform_libs.v0.upgrade import ( - ClusterNotReadyError, - KubernetesClientError, -) +from charms.data_platform_libs.v0.upgrade import ClusterNotReadyError, KubernetesClientError from charms.mysql.v0.mysql import MySQLSetClusterPrimaryError, MySQLSetVariableError from ops.model import BlockedStatus from ops.testing import Harness