From 1b40dfd4882fa719089f8daed77e46ac2fe22ca5 Mon Sep 17 00:00:00 2001 From: Paulo Machado Date: Thu, 15 Jun 2023 08:59:21 -0300 Subject: [PATCH] Fix 9+ members in waiting and cluster join refactoring (#229) * initial refactor * won't fail when failed to acquire lock * fixes and remove deprecated handler * extracted function to decrease complexity * extracted function to minimize complexity * minimal adjustment to pass tests * removed unused import * better messaging * decrease secondary cluster node count to speed up test * pr comments * deprecated key * using new function * fix missing cleanup * extract function * s/block/wait on max size and replan to start all services * remove redundant handlers avoid race conditions * optimize db access calls * consolidate calls * minimize overhead of pebble calls * fix tests * tests requires more memory * temp ci debug * updated and extended optimization approach * re-add code lost on merge from main * change test criteria to be more broad Test pass with juju 2.9.29 and 2.9.42 * not flaky anymore * remove upterm * formmating * test hack to confirm a theory * stricter constraint to allow pods being scheduled * recategorize test as unstable * minimal viable mem constraint * mempry constraint applied only for mysql * unused import * restore tests * remove agent constraint * lower mem restriction * removed 2.9.29 hack * descrease mem * more mem tweak * CI test --- .github/workflows/ci.yaml | 2 - lib/charms/mysql/v0/mysql.py | 139 +++++-- src/charm.py | 344 +++++++++++------- src/constants.py | 1 + src/mysql_k8s_helpers.py | 38 +- src/relations/mysql_provider.py | 75 +--- tests/integration/conftest.py | 11 +- .../high_availability/test_replication.py | 1 + tests/integration/test_charm.py | 3 - tests/unit/test_charm.py | 21 +- tests/unit/test_mysql_k8s_helpers.py | 38 +- 11 files changed, 363 insertions(+), 310 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5e8031293..3ca596b4b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -61,8 +61,6 @@ jobs: uses: charmed-kubernetes/actions-operator@main with: provider: microk8s - # This is needed until https://bugs.launchpad.net/juju/+bug/1977582 is fixed - bootstrap-options: "--agent-version 2.9.29" - name: Download packed charm(s) uses: actions/download-artifact@v3 with: diff --git a/lib/charms/mysql/v0/mysql.py b/lib/charms/mysql/v0/mysql.py index 62a692b81..b7d90a46a 100644 --- a/lib/charms/mysql/v0/mysql.py +++ b/lib/charms/mysql/v0/mysql.py @@ -91,9 +91,10 @@ def wait_until_mysql_connection(self) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 32 +LIBPATCH = 34 UNIT_TEARDOWN_LOCKNAME = "unit-teardown" +UNIT_ADD_LOCKNAME = "unit-add" class Error(Exception): @@ -290,6 +291,10 @@ class MySQLKillSessionError(Error): """Exception raised when there is an issue killing a connection.""" +class MySQLLockAcquisitionError(Error): + """Exception raised when a lock fails to be acquired.""" + + class MySQLRescanClusterError(Error): """Exception raised when there is an issue rescanning the cluster.""" @@ -465,20 +470,18 @@ def configure_mysqlrouter_user( if there is an issue creating and configuring the mysqlrouter user """ try: - primary_address = self.get_cluster_primary_address() - escaped_mysqlrouter_user_attributes = json.dumps({"unit_name": unit_name}).replace( '"', r"\"" ) # Using server_config_user as we are sure it has create user grants create_mysqlrouter_user_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", f"session.run_sql(\"CREATE USER '{username}'@'{hostname}' IDENTIFIED BY '{password}' ATTRIBUTE '{escaped_mysqlrouter_user_attributes}';\")", ) # Using server_config_user as we are sure it has create user grants mysqlrouter_user_grant_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", f"session.run_sql(\"GRANT CREATE USER ON *.* TO '{username}'@'{hostname}' WITH GRANT OPTION;\")", f"session.run_sql(\"GRANT SELECT, INSERT, UPDATE, DELETE, EXECUTE ON mysql_innodb_cluster_metadata.* TO '{username}'@'{hostname}';\")", f"session.run_sql(\"GRANT SELECT ON mysql.user TO '{username}'@'{hostname}';\")", @@ -525,26 +528,28 @@ def create_application_database_and_scoped_user( if unit_name is not None: attributes["unit_name"] = unit_name try: - primary_address = self.get_cluster_primary_address() - # Using server_config_user as we are sure it has create database grants + connect_command = ( + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", + ) create_database_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", f'session.run_sql("CREATE DATABASE IF NOT EXISTS `{database_name}`;")', ) escaped_user_attributes = json.dumps(attributes).replace('"', r"\"") # Using server_config_user as we are sure it has create user grants create_scoped_user_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", f"session.run_sql(\"CREATE USER `{username}`@`{hostname}` IDENTIFIED BY '{password}' ATTRIBUTE '{escaped_user_attributes}';\")", f'session.run_sql("GRANT USAGE ON *.* TO `{username}`@`{hostname}`;")', f'session.run_sql("GRANT ALL PRIVILEGES ON `{database_name}`.* TO `{username}`@`{hostname}`;")', ) if create_database: - self._run_mysqlsh_script("\n".join(create_database_commands)) - self._run_mysqlsh_script("\n".join(create_scoped_user_commands)) + commands = connect_command + create_database_commands + create_scoped_user_commands + else: + commands = connect_command + create_scoped_user_commands + + self._run_mysqlsh_script("\n".join(commands)) except MySQLClientError as e: logger.exception( f"Failed to create application database {database_name} and scoped user {username}@{hostname}", @@ -603,12 +608,9 @@ def delete_users_for_unit(self, unit_name: str) -> None: Raises: MySQLDeleteUsersForUnitError if there is an error deleting users for the unit """ - primary_address = self.get_cluster_primary_address() - if not primary_address: - raise MySQLDeleteUsersForUnitError("Unable to query cluster primary address") # Using server_config_user as we are sure it has drop user grants drop_users_command = [ - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", ] drop_users_command.extend( self._get_statements_to_delete_users_with_attribute("unit_name", f"'{unit_name}'") @@ -629,11 +631,8 @@ def delete_users_for_relation(self, relation_id: int) -> None: MySQLDeleteUsersForRelationError if there is an error deleting users for the relation """ user = f"relation-{str(relation_id)}" - primary_address = self.get_cluster_primary_address() - if not primary_address: - raise MySQLDeleteUsersForRelationError("Unable to query cluster primary address") drop_users_command = [ - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", f"session.run_sql(\"DROP USER IF EXISTS '{user}'@'%';\")", ] # If the relation is with a MySQL Router charm application, delete any users @@ -649,11 +648,8 @@ def delete_users_for_relation(self, relation_id: int) -> None: def delete_user(self, username: str) -> None: """Delete user.""" - primary_address = self.get_cluster_primary_address() - if not primary_address: - raise MySQLDeleteUserError("Unable to query cluster primary address") drop_user_command = [ - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", f"session.run_sql(\"DROP USER `{username}`@'%'\")", ] try: @@ -664,11 +660,8 @@ def delete_user(self, username: str) -> None: def remove_router_from_cluster_metadata(self, router_id: str) -> None: """Remove MySQL Router from InnoDB Cluster metadata.""" - primary_address = self.get_cluster_primary_address() - if not primary_address: - raise MySQLRemoveRouterFromMetadataError("Unable to query cluster primary address") command = [ - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{primary_address}')", + f"shell.connect_to_primary('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')", "cluster = dba.get_cluster()", f'cluster.remove_router_metadata("{router_id}")', ] @@ -746,8 +739,12 @@ def initialize_juju_units_operations_table(self) -> None: initializing the juju_units_operations table """ initialize_table_commands = ( - "CREATE TABLE IF NOT EXISTS mysql.juju_units_operations (task varchar(20), executor varchar(20), status varchar(20), primary key(task))", - f"INSERT INTO mysql.juju_units_operations values ('{UNIT_TEARDOWN_LOCKNAME}', '', 'not-started') ON DUPLICATE KEY UPDATE executor = '', status = 'not-started'", + "CREATE TABLE IF NOT EXISTS mysql.juju_units_operations (task varchar(20), executor " + "varchar(20), status varchar(20), primary key(task))", + f"INSERT INTO mysql.juju_units_operations values ('{UNIT_TEARDOWN_LOCKNAME}', '', " + "'not-started') ON DUPLICATE KEY UPDATE executor = '', status = 'not-started'", + f"INSERT INTO mysql.juju_units_operations values ('{UNIT_ADD_LOCKNAME}', '', " + "'not-started') ON DUPLICATE KEY UPDATE executor = '', status = 'not-started'", ) try: @@ -788,12 +785,18 @@ def add_instance_to_cluster( "label": instance_unit_label, } + if not self._acquire_lock( + from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME + ): + raise MySQLLockAcquisitionError("Lock not acquired") + connect_commands = ( ( f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}" f"@{from_instance or self.instance_address}')" ), f"cluster = dba.get_cluster('{self.cluster_name}')", + "shell.options['dba.restartWaitTimeout'] = 3600", ) for recovery_method in ["auto", "clone"]: @@ -816,11 +819,19 @@ def add_instance_to_cluster( f"Failed to add instance {instance_address} to cluster {self.cluster_name} on {self.instance_address}", exc_info=e, ) + self._release_lock( + from_instance or self.instance_address, + instance_unit_label, + UNIT_ADD_LOCKNAME, + ) raise MySQLAddInstanceToClusterError(e.message) logger.debug( f"Failed to add instance {instance_address} to cluster {self.cluster_name} with recovery method 'auto'. Trying method 'clone'" ) + self._release_lock( + from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME + ) def is_instance_configured_for_innodb( self, instance_address: str, instance_unit_label: str @@ -854,6 +865,34 @@ def is_instance_configured_for_innodb( ) return False + def are_locks_acquired(self, from_instance: Optional[str] = None) -> bool: + """Report if any topology change is being executed. + + Query the mysql.juju_units_operations table for any + in-progress lock for either unit add or removal. + + Args: + from_instance: member instance to run the command from (fallback to current one) + """ + commands = ( + ( + f"shell.connect('{self.server_config_user}:{self.server_config_password}" + f"@{from_instance or self.instance_address}')" + ), + "result = session.run_sql(\"SELECT COUNT(*) FROM mysql.juju_units_operations WHERE status='in-progress';\")", + "print(f'{result.fetch_one()[0]}')", + ) + try: + output = self._run_mysqlsh_script("\n".join(commands)) + except MySQLClientError: + # log error and fallback to assuming topology is changing + logger.exception("Failed to get locks count") + return True + + matches = re.search(r"(\d)", output) + + return int(matches.group(1)) > 0 if matches else False + def rescan_cluster( self, from_instance: Optional[str] = None, @@ -936,7 +975,30 @@ def get_cluster_status(self) -> Optional[dict]: output_dict = json.loads(output.lower()) return output_dict except MySQLClientError: - logger.exception(f"Failed to get cluster status for {self.cluster_name}") + logger.error(f"Failed to get cluster status for {self.cluster_name}") + + def get_cluster_node_count(self, from_instance: Optional[str] = None) -> int: + """Retrieve current count of cluster nodes. + + Returns: + Amount of cluster nodes. + """ + size_commands = ( + f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}" + f"@{from_instance or self.instance_address}')", + 'result = session.run_sql("SELECT COUNT(*) FROM performance_schema.replication_group_members")', + 'print(f"{result.fetch_one()[0]}")', + ) + + try: + output = self._run_mysqlsh_script("\n".join(size_commands)) + except MySQLClientError as e: + logger.warning("Failed to get node count", exc_info=e) + return 0 + + matches = re.search(r"(\d)", output) + + return int(matches.group(1)) if matches else 0 def get_cluster_endpoints(self, get_ips: bool = True) -> Tuple[str, str, str]: """Use get_cluster_status to return endpoints tuple. @@ -1098,7 +1160,11 @@ def _acquire_lock(self, primary_address: str, unit_label: str, lock_name: str) - "print(f'{acquired_lock}')", ) - output = self._run_mysqlsh_script("\n".join(acquire_lock_commands)) + try: + output = self._run_mysqlsh_script("\n".join(acquire_lock_commands)) + except MySQLClientError: + logger.debug("Failed to acquire lock") + return False matches = re.search(r"(\d)", output) if not matches: return False @@ -1171,9 +1237,8 @@ def get_cluster_primary_address(self, connect_instance_address: str = None) -> O logger.debug(f"Getting cluster primary member's address from {connect_instance_address}") get_cluster_primary_commands = ( - f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance_address}')", - f"cluster = dba.get_cluster('{self.cluster_name}')", - "primary_address = sorted([cluster_member['address'] for cluster_member in cluster.status()['defaultReplicaSet']['topology'].values() if cluster_member['mode'] == 'R/W'])[0]", + f"shell.connect_to_primary('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance_address}')", + "primary_address = shell.parse_uri(session.uri)['host']", "print(f'{primary_address}')", ) @@ -1256,12 +1321,8 @@ def grant_privileges_to_user( Raises: MySQLGrantPrivilegesToUserError if there is an issue granting privileges to a user """ - cluster_primary = self.get_cluster_primary_address() - if not cluster_primary: - raise MySQLGrantPrivilegesToUserError("Failed to get cluster primary address") - grant_privileges_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{cluster_primary}')", + f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')", f"session.run_sql(\"GRANT {', '.join(privileges)} ON *.* TO '{username}'@'{hostname}'{' WITH GRANT OPTION' if with_grant_option else ''}\")", ) diff --git a/src/charm.py b/src/charm.py index eb3adebb4..142bc6c06 100755 --- a/src/charm.py +++ b/src/charm.py @@ -5,6 +5,7 @@ """Charm for MySQL.""" import logging +from socket import getfqdn from typing import Dict, Optional from charms.data_platform_libs.v0.s3 import S3Requirer @@ -16,9 +17,13 @@ MySQLConfigureInstanceError, MySQLConfigureMySQLUsersError, MySQLCreateClusterError, + MySQLGetClusterPrimaryAddressError, MySQLGetMemberStateError, MySQLGetMySQLVersionError, + MySQLInitializeJujuOperationsTableError, + MySQLLockAcquisitionError, MySQLRebootFromCompleteOutageError, + MySQLRescanClusterError, ) from charms.mysql.v0.tls import MySQLTLS from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider @@ -27,7 +32,6 @@ CharmBase, LeaderElectedEvent, RelationChangedEvent, - RelationJoinedEvent, UpdateStatusEvent, ) from ops.main import main @@ -46,6 +50,7 @@ CLUSTER_ADMIN_PASSWORD_KEY, CLUSTER_ADMIN_USERNAME, CONTAINER_NAME, + GR_MAX_MEMBERS, MONITORING_PASSWORD_KEY, MONITORING_USERNAME, MYSQL_LOG_FILES, @@ -93,7 +98,6 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.update_status, self._on_update_status) - self.framework.observe(self.on[PEER].relation_joined, self._on_peer_relation_joined) self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) # Actions events @@ -170,7 +174,6 @@ def _is_peer_data_set(self): and self.get_secret("app", CLUSTER_ADMIN_PASSWORD_KEY) and self.get_secret("app", MONITORING_PASSWORD_KEY) and self.get_secret("app", BACKUPS_PASSWORD_KEY) - and self.app_peer_data.get("allowlist") ) @property @@ -239,7 +242,7 @@ def get_unit_hostname(self, unit_name: Optional[str] = None) -> str: unit_name = unit_name or self.unit.name return f"{unit_name.replace('/', '-')}.{self.app.name}-endpoints" - def _get_unit_fqdn(self, unit_name: str) -> str: + def _get_unit_fqdn(self, unit_name: Optional[str] = None) -> str: """Create a fqdn for a unit. Translate juju unit name to resolvable hostname. @@ -249,7 +252,7 @@ def _get_unit_fqdn(self, unit_name: str) -> str: Returns: A string representing the fqdn of the unit. """ - return f"{self.get_unit_hostname(unit_name)}.{self.model.name}.svc.cluster.local" + return getfqdn(self.get_unit_hostname(unit_name)) def get_secret(self, scope: str, key: str) -> Optional[str]: """Get secret from the secret storage.""" @@ -283,6 +286,180 @@ def is_unit_busy(self) -> bool: """Returns whether the unit is busy.""" return self._is_cluster_blocked() + def _prepare_configs(self, container: Container) -> bool: + """Copies files to the workload container. + + Meant to be called from the pebble-ready handler. + + Returns: a boolean indicating if the method was successful. + """ + if container.exists(MYSQLD_CONFIG_FILE): + return True + try: + ( + innodb_buffer_pool_size, + innodb_buffer_pool_chunk_size, + ) = self._mysql.get_innodb_buffer_pool_parameters() + except MySQLGetInnoDBBufferPoolParametersError: + self.unit.status = BlockedStatus("Error computing innodb_buffer_pool_size") + return False + + try: + self._mysql.create_custom_config_file( + report_host=self._get_unit_fqdn(self.unit.name), + innodb_buffer_pool_size=innodb_buffer_pool_size, + innodb_buffer_pool_chunk_size=innodb_buffer_pool_chunk_size, + ) + except MySQLCreateCustomConfigFileError: + self.unit.status = BlockedStatus("Failed to copy custom mysql config file") + return False + + return True + + def _get_primary_from_online_peer(self) -> Optional[str]: + """Get the primary address from an online peer.""" + for unit in self.peers.units: + if self.peers.data[unit].get("member-state") == "online": + try: + return self._mysql.get_cluster_primary_address( + connect_instance_address=self._get_unit_fqdn(unit.name), + ) + except MySQLGetClusterPrimaryAddressError: + # try next unit + continue + + def _is_unit_waiting_to_join_cluster(self) -> bool: + """Return if the unit is waiting to join the cluster.""" + # alternatively, we could check if the instance is configured + # and have an empty performance_schema.replication_group_members table + return ( + self.unit_peer_data.get("member-state") == "waiting" + and self.unit_peer_data.get("unit-configured") == "True" + and not self.unit_peer_data.get("unit-initialized") + ) + + def _join_unit_to_cluster(self) -> None: + """Join the unit to the cluster. + + Try to join the unit from the primary unit. + """ + instance_label = self.unit.name.replace("/", "-") + instance_fqdn = self._get_unit_fqdn(self.unit.name) + + if self._mysql.is_instance_in_cluster(instance_label): + logger.debug("instance already in cluster") + return + + # Add new instance to the cluster + try: + cluster_primary = self._get_primary_from_online_peer() + if not cluster_primary: + self.unit.status = WaitingStatus("waiting to get cluster primary from peers") + logger.debug("waiting: unable to retrieve the cluster primary from peers") + return + + if self._mysql.get_cluster_node_count(from_instance=cluster_primary) == GR_MAX_MEMBERS: + self.unit.status = WaitingStatus( + f"Cluster reached max size of {GR_MAX_MEMBERS} units. Standby." + ) + logger.warning( + f"Cluster reached max size of {GR_MAX_MEMBERS} units. This unit will stay as standby." + ) + return + + if self._mysql.are_locks_acquired(from_instance=cluster_primary): + self.unit.status = WaitingStatus("waiting to join the cluster") + logger.debug("waiting: cluster lock is held") + return + + self.unit.status = MaintenanceStatus("joining the cluster") + + self._mysql.add_instance_to_cluster( + instance_fqdn, instance_label, from_instance=cluster_primary + ) + logger.debug(f"Added instance {instance_fqdn} to cluster") + + # Update 'units-added-to-cluster' counter in the peer relation databag + self.unit_peer_data["unit-initialized"] = "True" + self.unit_peer_data["member-state"] = "online" + self.unit.status = ActiveStatus(self.active_status_message) + logger.debug(f"Instance {instance_label} is cluster member") + + except MySQLAddInstanceToClusterError: + logger.debug(f"Unable to add instance {instance_fqdn} to cluster.") + except MySQLLockAcquisitionError: + self.unit.status = WaitingStatus("waiting to join the cluster") + logger.debug("waiting: failed to acquire lock when adding instance to cluster") + + def _remove_scaled_down_units(self) -> None: + """Remove scaled down units from the cluster.""" + current_units = 1 + len(self.peers.units) + cluster_status = self._mysql.get_cluster_status() + if not cluster_status: + self.unit.status = BlockedStatus("Failed to get cluster status") + return + + try: + addresses_of_units_to_remove = [ + member["address"] + for unit_name, member in cluster_status["defaultreplicaset"]["topology"].items() + if int(unit_name.split("-")[-1]) >= current_units + ] + except ValueError: + # exception can occur if unit is not yet labeled + return + + if not addresses_of_units_to_remove: + return + + self.unit.status = MaintenanceStatus("Removing scaled down units from cluster") + + for unit_address in addresses_of_units_to_remove: + try: + self._mysql.force_remove_unit_from_cluster(unit_address) + except MySQLForceRemoveUnitFromClusterError: + self.unit.status = BlockedStatus("Failed to remove scaled down unit from cluster") + return + self.unit.status = ActiveStatus(self.active_status_message) + + def _reconcile_pebble_layer(self, container: Container) -> None: + """Reconcile pebble layer.""" + current_layer = container.get_plan() + new_layer = self._pebble_layer + + if new_layer.services != current_layer.services: + logger.info("Adding pebble layer") + + container.add_layer(MYSQLD_SAFE_SERVICE, new_layer, combine=True) + container.replan() + self._mysql.wait_until_mysql_connection() + self._on_update_status(None) + + def _rescan_cluster(self) -> None: + """Rescan the cluster topology.""" + try: + primary_address = self._mysql.get_cluster_primary_address() + except MySQLGetClusterPrimaryAddressError: + return + + if not primary_address: + return + + # Set active status when primary is known + self.app.status = ActiveStatus() + + if self._mysql.are_locks_acquired(from_instance=primary_address): + logger.debug("Skip cluster rescan while locks are acquired") + return + + # Only rescan cluster when topology is not changing + try: + self._mysql.rescan_cluster( + remove_instances=True, add_instances=True, from_instance=primary_address + ) + except MySQLRescanClusterError: + logger.warning("Failed to rescan cluster") + # ========================================================================= # Charm event handlers # ========================================================================= @@ -299,16 +476,10 @@ def _on_config_changed(self, _) -> None: self.config.get("cluster-name") or f"cluster_{generate_random_hash()}" ) - # initialise allowlist with leader hostname - if not self.app_peer_data.get("allowlist"): - self.app_peer_data["allowlist"] = f"{self._get_unit_fqdn(self.unit.name)}" - def _on_leader_elected(self, _: LeaderElectedEvent) -> None: """Handle the leader elected event. Set config values in the peer relation databag if not already set. - Idempotently remove unreachable instances from the cluster and update - the allowlist accordingly. """ # Set required passwords if not already set required_passwords = [ @@ -352,6 +523,7 @@ def _configure_instance(self, container) -> bool: # Restart exporter service after configuration container.restart(MYSQLD_EXPORTER_SERVICE) + self.unit_peer_data["instance-hostname"] = self._get_unit_fqdn(self.unit.name) self.unit_peer_data["unit-configured"] = "True" except ( MySQLConfigureInstanceError, @@ -372,35 +544,6 @@ def _configure_instance(self, container) -> bool: return True - def _prepare_configs(self, container: Container) -> bool: - """Copies files to the workload container. - - Meant to be called from the pebble-ready handler. - - Returns: a boolean indicating if the method was successful. - """ - if not container.exists(MYSQLD_CONFIG_FILE): - try: - ( - innodb_buffer_pool_size, - innodb_buffer_pool_chunk_size, - ) = self._mysql.get_innodb_buffer_pool_parameters() - except MySQLGetInnoDBBufferPoolParametersError: - self.unit.status = BlockedStatus("Error computing innodb_buffer_pool_size") - return False - - try: - self._mysql.create_custom_config_file( - report_host=self._get_unit_fqdn(self.unit.name), - innodb_buffer_pool_size=innodb_buffer_pool_size, - innodb_buffer_pool_chunk_size=innodb_buffer_pool_chunk_size, - ) - except MySQLCreateCustomConfigFileError: - self.unit.status = BlockedStatus("Failed to copy custom mysql config file") - return False - - return True - def _mysql_pebble_ready_checks(self, event) -> bool: """Executes some checks to see if it is safe to execute the pebble ready handler.""" if not self._is_peer_data_set: @@ -432,17 +575,7 @@ def _on_mysql_pebble_ready(self, event) -> None: self.unit.status = MaintenanceStatus("Initialising mysqld") if self.unit_peer_data.get("unit-configured"): # Only update pebble layer if unit is already configured for GR - current_layer = container.get_plan() - new_layer = self._pebble_layer - - if new_layer.services != current_layer: - logger.info("Adding pebble layer") - - container.add_layer(MYSQLD_SAFE_SERVICE, new_layer, combine=True) - container.restart(MYSQLD_SAFE_SERVICE) - self._mysql.wait_until_mysql_connection() - self._on_update_status(None) - + self._reconcile_pebble_layer(container) return # First run setup @@ -454,6 +587,8 @@ def _on_mysql_pebble_ready(self, event) -> None: # Non-leader units should wait for leader to add them to the cluster self.unit.status = WaitingStatus("Waiting for instance to join the cluster") self.unit_peer_data.update({"member-role": "secondary", "member-state": "waiting"}) + + self._join_unit_to_cluster() return try: @@ -462,7 +597,8 @@ def _on_mysql_pebble_ready(self, event) -> None: unit_label = self.unit.name.replace("/", "-") self._mysql.create_cluster(unit_label) - # Create control file in data directory + self._mysql.initialize_juju_units_operations_table() + # Start control flag self.app_peer_data["units-added-to-cluster"] = "1" state, role = self._mysql.get_member_state() @@ -477,6 +613,8 @@ def _on_mysql_pebble_ready(self, event) -> None: logger.debug("Unable to create cluster: {}".format(e)) except MySQLGetMemberStateError: self.unit.status = BlockedStatus("Unable to query member state and role") + except MySQLInitializeJujuOperationsTableError: + self.unit.status = BlockedStatus("Failed to initialize juju operations table") def _handle_potential_cluster_crash_scenario(self) -> bool: """Handle potential full cluster crash scenarios. @@ -549,12 +687,18 @@ def _is_cluster_blocked(self) -> bool: return False - def _on_update_status(self, event: UpdateStatusEvent) -> None: + def _on_update_status(self, _: Optional[UpdateStatusEvent]) -> None: """Handle the update status event. One purpose of this event handler is to ensure that scaled down units are removed from the cluster. """ + if not self.unit.is_leader() and self._is_unit_waiting_to_join_cluster(): + # join cluster test takes precedence over blocked test + # due to matching criteria + self._join_unit_to_cluster() + return + if self._is_cluster_blocked(): return @@ -568,84 +712,13 @@ def _on_update_status(self, event: UpdateStatusEvent) -> None: if not self.unit.is_leader(): return - current_units = 1 + len(self.peers.units) - - cluster_status = self._mysql.get_cluster_status() - if not cluster_status: - self.unit.status = BlockedStatus("Failed to get cluster status") - return - - addresses_of_units_to_remove = [ - member["address"] - for unit_name, member in cluster_status["defaultreplicaset"]["topology"].items() - if int(unit_name.split("-")[-1]) >= current_units - ] - - if not addresses_of_units_to_remove: - return - - self.unit.status = MaintenanceStatus("Removing scaled down units from cluster") - - for unit_address in addresses_of_units_to_remove: - try: - self._mysql.force_remove_unit_from_cluster(unit_address) - except MySQLForceRemoveUnitFromClusterError: - self.unit.status = BlockedStatus("Failed to remove scaled down unit from cluster") - return - - self.unit.status = ActiveStatus(self.active_status_message) - - def _on_peer_relation_joined(self, event: RelationJoinedEvent): - """Handle the peer relation joined event.""" - # Only leader unit add instances to the cluster - if not self.unit.is_leader(): - return - - # only add other units - if event.unit.name == self.unit.name: - return - - # Defer run when leader is not active - if not isinstance(self.unit.status, ActiveStatus): - event.defer() - return - - new_instance_fqdn = self._get_unit_fqdn(event.unit.name) - new_instance_label = event.unit.name.replace("/", "-") - - # Check if new instance is ready to be added to the cluster - if not self._mysql.is_instance_configured_for_innodb( - new_instance_fqdn, new_instance_label - ): - event.defer() - return - - # Check if instance was already added to the cluster - if self._mysql.is_instance_in_cluster(new_instance_label): - logger.debug(f"Instance {new_instance_fqdn} already in cluster") - return - - # Add new instance to the cluster - try: - cluster_primary = self._mysql.get_cluster_primary_address() - if not cluster_primary: - self.unit.status = BlockedStatus("Unable to retrieve the cluster primary") - return - - self._mysql.add_instance_to_cluster( - new_instance_fqdn, new_instance_label, from_instance=cluster_primary - ) - logger.debug(f"Added instance {new_instance_fqdn} to cluster") + nodes = self._mysql.get_cluster_node_count() + if nodes > 0: + self.app_peer_data["units-added-to-cluster"] = str(nodes) - # Update 'units-added-to-cluster' counter in the peer relation databag - # in order to trigger a relation_changed event which will move the added unit - # into ActiveStatus - units_started = int(self.app_peer_data["units-added-to-cluster"]) - self.app_peer_data["units-added-to-cluster"] = str(units_started + 1) - - except MySQLAddInstanceToClusterError: - logger.debug(f"Unable to add instance {new_instance_fqdn} to cluster.") - self.unit.status = BlockedStatus("Unable to add instance to cluster") + # Check if there are any scaled down units that need to be removed from the cluster + self._remove_scaled_down_units() + self._rescan_cluster() def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: """Handle the relation changed event.""" @@ -656,15 +729,8 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: event.defer() return - instance_label = self.unit.name.replace("/", "-") - # Test if non-leader unit is ready - if isinstance(self.unit.status, WaitingStatus) and self._mysql.is_instance_in_cluster( - instance_label - ): - self.unit_peer_data["unit-initialized"] = "True" - self.unit_peer_data["member-state"] = "online" - self.unit.status = ActiveStatus(self.active_status_message) - logger.debug(f"Instance {instance_label} is cluster member") + if self._is_unit_waiting_to_join_cluster(): + self._join_unit_to_cluster() # ========================================================================= # Charm action handlers diff --git a/src/constants.py b/src/constants.py index 757fb682a..bd6a8a709 100644 --- a/src/constants.py +++ b/src/constants.py @@ -51,3 +51,4 @@ MYSQLD_DEFAULTS_CONFIG_FILE = "/etc/mysql/my.cnf" MYSQLD_EXPORTER_PORT = "9104" MYSQLD_EXPORTER_SERVICE = "mysqld_exporter" +GR_MAX_MEMBERS = 9 diff --git a/src/mysql_k8s_helpers.py b/src/mysql_k8s_helpers.py index 6f10b9adc..1c18c9e4f 100644 --- a/src/mysql_k8s_helpers.py +++ b/src/mysql_k8s_helpers.py @@ -477,10 +477,11 @@ def create_database(self, database_name: str) -> None: MySQLCreateDatabaseError if there is an issue creating specified database """ try: - primary_address = self.get_cluster_primary_address() - create_database_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + ( + f"shell.connect_to_primary('{self.server_config_user}:" + f"{self.server_config_password}@{self.instance_address}')" + ), f'session.run_sql("CREATE DATABASE IF NOT EXISTS `{database_name}`;")', ) @@ -502,12 +503,16 @@ def create_user(self, username: str, password: str, label: str, hostname: str = MySQLCreateUserError if there is an issue creating specified user """ try: - primary_address = self.get_cluster_primary_address() - escaped_user_attributes = json.dumps({"label": label}).replace('"', r"\"") create_user_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", - f"session.run_sql(\"CREATE USER `{username}`@`{hostname}` IDENTIFIED BY '{password}' ATTRIBUTE '{escaped_user_attributes}';\")", + ( + f"shell.connect_to_primary('{self.server_config_user}:" + f"{self.server_config_password}@{self.instance_address}')" + ), + ( + f'session.run_sql("CREATE USER `{username}`@`{hostname}` IDENTIFIED' + f" BY '{password}' ATTRIBUTE '{escaped_user_attributes}';\")" + ), ) self._run_mysqlsh_script("\n".join(create_user_commands)) @@ -539,10 +544,11 @@ def escalate_user_privileges(self, username: str, hostname: str = "%") -> None: "CONNECTION_ADMIN", ) - primary_address = self.get_cluster_primary_address() - escalate_user_privileges_commands = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + ( + f"shell.connect_to_primary('{self.server_config_user}:" + f"{self.server_config_password}@{self.instance_address}')" + ), f'session.run_sql("GRANT ALL ON *.* TO `{username}`@`{hostname}` WITH GRANT OPTION;")', f"session.run_sql(\"REVOKE {', '.join(super_privileges_to_revoke)} ON *.* FROM `{username}`@`{hostname}`;\")", 'session.run_sql("FLUSH PRIVILEGES;")', @@ -585,11 +591,12 @@ def delete_users_with_label(self, label_name: str, label_value: str) -> None: logger.debug(f"There are no users to drop for label {label_name}={label_value}") return - primary_address = self.get_cluster_primary_address() - # Using server_config_user as we are sure it has drop user grants drop_users_command = ( - f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')", + ( + f"shell.connect_to_primary('{self.server_config_user}:" + f"{self.server_config_password}@{self.instance_address}')" + ), f"session.run_sql(\"DROP USER IF EXISTS {', '.join(users)};\")", ) self._run_mysqlsh_script("\n".join(drop_users_command)) @@ -810,6 +817,11 @@ def _get_total_memory(self) -> int: container_limits = self.k8s_helper.get_resources_limits(CONTAINER_NAME) if "memory" in container_limits: mem_str = container_limits["memory"] + logger.debug(f"Memory constrained to {mem_str} from resource limit") return any_memory_to_bytes(mem_str) + # Test validation + # Unset limits from k8s + return 796917760 + return super()._get_total_memory() diff --git a/src/relations/mysql_provider.py b/src/relations/mysql_provider.py index 7ac91036e..774847bb6 100644 --- a/src/relations/mysql_provider.py +++ b/src/relations/mysql_provider.py @@ -20,14 +20,9 @@ MySQLGrantPrivilegesToUserError, MySQLRemoveRouterFromMetadataError, ) -from ops.charm import ( - PebbleReadyEvent, - RelationBrokenEvent, - RelationDepartedEvent, - RelationJoinedEvent, -) +from ops.charm import PebbleReadyEvent, RelationBrokenEvent, RelationDepartedEvent from ops.framework import Object -from ops.model import BlockedStatus +from ops.model import ActiveStatus, BlockedStatus from constants import ( CONTAINER_NAME, @@ -61,10 +56,6 @@ def __init__(self, charm) -> None: self._on_database_provides_relation_departed, ) - self.framework.observe( - self.charm.on[PEER].relation_departed, self._on_peer_relation_departed - ) - self.framework.observe(self.charm.on[PEER].relation_joined, self._on_peer_relation_joined) self.framework.observe(self.charm.on[PEER].relation_changed, self._configure_endpoints) self.framework.observe(self.charm.on.leader_elected, self._configure_endpoints) self.framework.observe(self.charm.on.mysql_pebble_ready, self._on_mysql_pebble_ready) @@ -146,8 +137,7 @@ def _update_pod_endpoint(self) -> None: logger.debug(f"Labeling pod {pod} with label {label}") self.charm.k8s_helpers.label_pod(label, pod) except KubernetesClientError: - logger.debug("Can't update pod labels") - self.charm.unit.status = BlockedStatus("Can't update pod label") + logger.error("Error updating pod label. Traffic may not be properly routed.") # ============= # Handlers @@ -232,7 +222,7 @@ def _on_database_requested(self, event: DatabaseRequestedEvent) -> None: ) event.defer() - def _on_mysql_pebble_ready(self, event: PebbleReadyEvent) -> None: + def _on_mysql_pebble_ready(self, _: PebbleReadyEvent) -> None: """Handle the mysql pebble ready event. Update a value in the peer app databag to trigger the peer_relation_changed @@ -240,16 +230,18 @@ def _on_mysql_pebble_ready(self, event: PebbleReadyEvent) -> None: """ container = self.charm.unit.get_container(CONTAINER_NAME) if not container.can_connect(): - event.defer() return - if not self.charm.cluster_initialized: + relations = self.charm.model.relations.get(DB_RELATION_NAME) + if not self.charm.cluster_initialized and not relations: + return + + if not isinstance(self.charm.unit.status, ActiveStatus): return charm_unit_label = self.charm.unit.name.replace("/", "-") if not self.charm._mysql.is_instance_in_cluster(charm_unit_label): logger.debug(f"Unit {self.charm.unit.name} is not yet a member of the cluster") - event.defer() return container_restarts = int(self.charm.unit_peer_data.get(CONTAINER_RESTARTS, "0")) @@ -257,57 +249,10 @@ def _on_mysql_pebble_ready(self, event: PebbleReadyEvent) -> None: self._configure_endpoints(None) - def _on_peer_relation_joined(self, event: RelationJoinedEvent) -> None: - """Handle the peer relation joined event. - - Update the endpoints + read_only_endpoints. - """ - relations = self.charm.model.relations.get(DB_RELATION_NAME, []) - if not relations or not self.charm.cluster_initialized: - return - - event_unit_label = event.unit.name.replace("/", "-") - if not self.charm._mysql.is_instance_in_cluster(event_unit_label): - logger.debug(f"Unit {event.unit.name} is not yet a member of the cluster") - event.defer() - return - - relation_data = self.database.fetch_relation_data() - for relation in relations: - # only update endpoints if on_database_requested has executed - if relation.id not in relation_data: - continue - - self._update_pod_endpoint() - - def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: - """Handle the peer relation departed event. - - Update the endpoints + read_only_endpoints. - """ - relations = self.charm.model.relations.get(DB_RELATION_NAME, []) - if not relations or not self.charm.cluster_initialized: - return - - departing_unit_name = event.departing_unit.name.replace("/", "-") - - if self.charm._mysql.is_instance_in_cluster(departing_unit_name): - logger.debug(f"Departing unit {departing_unit_name} still in cluster") - event.defer() - return - - relation_data = self.database.fetch_relation_data() - for relation in relations: - # only update endpoints if on_database_requested has executed - if relation.id not in relation_data: - continue - - self._update_pod_endpoint() - def _configure_endpoints(self, _) -> None: """Update the endpoints + read_only_endpoints.""" relations = self.charm.model.relations.get(DB_RELATION_NAME, []) - if not relations or not self.charm.cluster_initialized: + if not relations or not self.charm.unit_initialized: return relation_data = self.database.fetch_relation_data() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8f5f3dccf..786285593 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,7 +4,8 @@ import json import os -import subprocess + +# import subprocess from pathlib import Path import pytest @@ -27,9 +28,9 @@ async def build_charm(charm_path, bases_index: int = None) -> Path: ops_test.build_charm = build_charm - subprocess.run( - ["juju", "set-model-constraints", "--model", ops_test.model.info.name, "mem=500M"], - check=True, - ) + # subprocess.run( + # ["juju", "set-model-constraints", "--model", ops_test.model.info.name, "mem=720M"], + # check=True, + # ) return ops_test diff --git a/tests/integration/high_availability/test_replication.py b/tests/integration/high_availability/test_replication.py index 4f68da2b2..557613a2d 100644 --- a/tests/integration/high_availability/test_replication.py +++ b/tests/integration/high_availability/test_replication.py @@ -66,6 +66,7 @@ async def test_no_replication_across_clusters(ops_test: OpsTest, continuous_writ ops_test, check_for_existing_application=False, mysql_application_name=another_mysql_application_name, + num_units=1, ) # insert some data into the first/original mysql cluster diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 774965c1b..1633b2593 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -139,9 +139,6 @@ async def test_scale_up_and_down(ops_test: OpsTest) -> None: async with ops_test.fast_forward(): random_unit = ops_test.model.applications[APP_NAME].units[0] - # TODO: address the flakiness of scaling up multiple units at once - # For now, we'll scale up one at a time to make tests reliably pass - await scale_application(ops_test, APP_NAME, 4) await scale_application(ops_test, APP_NAME, 5) cluster_status = await get_cluster_status(ops_test, random_unit) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 808fbeece..1e6b982f3 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -4,7 +4,7 @@ # Learn more about testing at: https://juju.is/docs/sdk/testing import unittest -from unittest.mock import MagicMock, PropertyMock, patch +from unittest.mock import PropertyMock, patch from ops.model import ActiveStatus, BlockedStatus, WaitingStatus from ops.testing import Harness @@ -80,6 +80,7 @@ def test_on_leader_elected(self): peer_data[password].isalnum() and len(peer_data[password]) == PASSWORD_LENGTH ) + @patch("mysql_k8s_helpers.MySQL.initialize_juju_units_operations_table") @patch("mysql_k8s_helpers.MySQL.safe_stop_mysqld_safe") @patch("mysql_k8s_helpers.MySQL.get_mysql_version", return_value="8.0.0") @patch("mysql_k8s_helpers.MySQL.wait_until_mysql_connection") @@ -108,6 +109,7 @@ def test_mysql_pebble_ready( _wait_until_mysql_connection, _get_mysql_version, _safe_stop_mysqld_safe, + _initialize_juju_units_operations_table, ): # Check if initial plan is empty self.harness.set_can_connect("mysql", True) @@ -185,23 +187,6 @@ def test_mysql_property(self): mysql = self.charm._mysql self.assertTrue(isinstance(mysql, MySQL)) - @patch("charm.MySQLOperatorCharm._mysql") - def test_on_peer_relation_joined(self, _mysql_mock): - # Test basic peer relation joined calls - self.harness.set_leader() - event = MagicMock() - event.unit.name.return_value = f"{APP_NAME}/2" - self.charm._mysql = _mysql_mock - - _mysql_mock.is_instance_configured_for_innodb.return_value = True - - self.charm._on_peer_relation_joined(event) - - _mysql_mock.add_instance_to_cluster.called_once_with("mysql-k8s-endpoints.mysql-k8s-2") - _mysql_mock.is_instance_configured_for_innodb.called_once_with( - "mysql-k8s-endpoints.mysql-k8s-2" - ) - # @patch_network_get(private_address="1.1.1.1") @patch("charm.MySQLOperatorCharm._on_leader_elected") def test_get_secret(self, _): diff --git a/tests/unit/test_mysql_k8s_helpers.py b/tests/unit/test_mysql_k8s_helpers.py index dd32cc38b..11aae6fc8 100644 --- a/tests/unit/test_mysql_k8s_helpers.py +++ b/tests/unit/test_mysql_k8s_helpers.py @@ -138,12 +138,11 @@ def test_configure_mysql_users_exception(self, _run_mysqlcli_script): with self.assertRaises(MySQLConfigureMySQLUsersError): self.mysql.configure_mysql_users() - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_create_database(self, _run_mysqlsh_script, _get_cluster_primary_address): + def test_create_database(self, _run_mysqlsh_script): """Test successful execution of create_database.""" _expected_create_database_commands = ( - "shell.connect('serverconfig:serverconfigpassword@1.1.1.1:3306')", + "shell.connect_to_primary('serverconfig:serverconfigpassword@127.0.0.1')", 'session.run_sql("CREATE DATABASE IF NOT EXISTS `test_database`;")', ) @@ -151,22 +150,20 @@ def test_create_database(self, _run_mysqlsh_script, _get_cluster_primary_address _run_mysqlsh_script.assert_called_once_with("\n".join(_expected_create_database_commands)) - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_create_database_exception(self, _run_mysqlsh_script, _get_cluster_primary_address): + def test_create_database_exception(self, _run_mysqlsh_script): """Test exception while executing create_database.""" _run_mysqlsh_script.side_effect = MySQLClientError("Error creating database") with self.assertRaises(MySQLCreateDatabaseError): self.mysql.create_database("test_database") - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_create_user(self, _run_mysqlsh_script, _get_cluster_primary_address): + def test_create_user(self, _run_mysqlsh_script): """Test successful execution of create_user.""" _escaped_attributes = json.dumps({"label": "test_label"}).replace('"', r"\"") _expected_create_user_commands = ( - "shell.connect('serverconfig:serverconfigpassword@1.1.1.1:3306')", + "shell.connect_to_primary('serverconfig:serverconfigpassword@127.0.0.1')", f"session.run_sql(\"CREATE USER `test_user`@`%` IDENTIFIED BY 'test_password' ATTRIBUTE '{_escaped_attributes}';\")", ) @@ -174,18 +171,16 @@ def test_create_user(self, _run_mysqlsh_script, _get_cluster_primary_address): _run_mysqlsh_script.assert_called_once_with("\n".join(_expected_create_user_commands)) - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_create_user_exception(self, _run_mysqlsh_script, _get_cluster_primary_address): + def test_create_user_exception(self, _run_mysqlsh_script): """Test exception while executing create_user.""" _run_mysqlsh_script.side_effect = MySQLClientError("Error creating user") with self.assertRaises(MySQLCreateUserError): self.mysql.create_user("test_user", "test_password", "test_label") - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_escalate_user_privileges(self, _run_mysqlsh_script, _get_cluster_primary_address): + def test_escalate_user_privileges(self, _run_mysqlsh_script): """Test successful execution of escalate_user_privileges.""" super_privileges_to_revoke = ( "SYSTEM_USER", @@ -201,7 +196,7 @@ def test_escalate_user_privileges(self, _run_mysqlsh_script, _get_cluster_primar ) _expected_escalate_user_privileges_commands = ( - "shell.connect('serverconfig:serverconfigpassword@1.1.1.1:3306')", + "shell.connect_to_primary('serverconfig:serverconfigpassword@127.0.0.1')", 'session.run_sql("GRANT ALL ON *.* TO `test_user`@`%` WITH GRANT OPTION;")', f"session.run_sql(\"REVOKE {', '.join(super_privileges_to_revoke)} ON *.* FROM `test_user`@`%`;\")", 'session.run_sql("FLUSH PRIVILEGES;")', @@ -213,23 +208,17 @@ def test_escalate_user_privileges(self, _run_mysqlsh_script, _get_cluster_primar "\n".join(_expected_escalate_user_privileges_commands) ) - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_escalate_user_privileges_exception( - self, _run_mysqlsh_script, _get_cluster_primary_address - ): + def test_escalate_user_privileges_exception(self, _run_mysqlsh_script): """Test exception while executing escalate_user_privileges.""" _run_mysqlsh_script.side_effect = MySQLClientError("Error escalating user privileges") with self.assertRaises(MySQLEscalateUserPrivilegesError): self.mysql.escalate_user_privileges("test_user") - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlcli_script") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_delete_users_with_label( - self, _run_mysqlsh_script, _run_mysqlcli_script, _get_cluster_primary_address - ): + def test_delete_users_with_label(self, _run_mysqlsh_script, _run_mysqlcli_script): """Test successful execution of delete_users_with_label.""" _expected_get_label_users_commands = ( "SELECT CONCAT(user.user, '@', user.host) FROM mysql.user AS user " @@ -241,7 +230,7 @@ def test_delete_users_with_label( _run_mysqlcli_script.return_value = "users\ntest_user@%\ntest_user_2@localhost" _expected_drop_users_commands = ( - "shell.connect('serverconfig:serverconfigpassword@1.1.1.1:3306')", + "shell.connect_to_primary('serverconfig:serverconfigpassword@127.0.0.1')", "session.run_sql(\"DROP USER IF EXISTS 'test_user'@'%', 'test_user_2'@'localhost';\")", ) @@ -254,12 +243,9 @@ def test_delete_users_with_label( ) _run_mysqlsh_script.assert_called_once_with("\n".join(_expected_drop_users_commands)) - @patch("mysql_k8s_helpers.MySQL.get_cluster_primary_address", return_value="1.1.1.1:3306") @patch("mysql_k8s_helpers.MySQL._run_mysqlcli_script") @patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script") - def test_delete_users_with_label_exception( - self, _run_mysqlsh_script, _run_mysqlcli_script, _get_cluster_primary_address - ): + def test_delete_users_with_label_exception(self, _run_mysqlsh_script, _run_mysqlcli_script): """Test exception while executing delete_users_with_label.""" _run_mysqlcli_script.side_effect = MySQLClientError("Error getting label users")