Skip to content

Commit

Permalink
Fix 9+ members in waiting and cluster join refactoring (#229)
Browse files Browse the repository at this point in the history
* initial refactor

* won't fail when failed to acquire lock

* fixes and remove deprecated handler

* extracted function to decrease complexity

* extracted function to minimize complexity

* minimal adjustment to pass tests

* removed unused import

* better messaging

* decrease secondary cluster node count to speed up test

* pr comments

* deprecated key

* using new function

* fix missing cleanup

* extract function

* s/block/wait on max size and replan to start all services

* remove redundant handlers avoid race conditions

* optimize db access calls

* consolidate calls
* minimize overhead of pebble calls

* fix tests

* tests requires more memory

* temp ci debug

* updated and extended optimization approach

* re-add code lost on merge from main

* change test criteria to be more broad

Test pass with juju 2.9.29 and 2.9.42

* not flaky anymore

* remove upterm

* formmating

* test hack to confirm a theory

* stricter constraint to allow pods being scheduled

* recategorize test as unstable

* minimal viable mem constraint

* mempry constraint applied only for mysql

* unused import

* restore tests

* remove agent constraint

* lower mem restriction

* removed 2.9.29 hack

* descrease mem

* more mem tweak

* CI test
  • Loading branch information
paulomach authored Jun 15, 2023
1 parent dc6977e commit 1b40dfd
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 310 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ jobs:
uses: charmed-kubernetes/actions-operator@main
with:
provider: microk8s
# This is needed until https://bugs.launchpad.net/juju/+bug/1977582 is fixed
bootstrap-options: "--agent-version 2.9.29"
- name: Download packed charm(s)
uses: actions/download-artifact@v3
with:
Expand Down
139 changes: 100 additions & 39 deletions lib/charms/mysql/v0/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ def wait_until_mysql_connection(self) -> None:

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 32
LIBPATCH = 34

UNIT_TEARDOWN_LOCKNAME = "unit-teardown"
UNIT_ADD_LOCKNAME = "unit-add"


class Error(Exception):
Expand Down Expand Up @@ -290,6 +291,10 @@ class MySQLKillSessionError(Error):
"""Exception raised when there is an issue killing a connection."""


class MySQLLockAcquisitionError(Error):
"""Exception raised when a lock fails to be acquired."""


class MySQLRescanClusterError(Error):
"""Exception raised when there is an issue rescanning the cluster."""

Expand Down Expand Up @@ -465,20 +470,18 @@ def configure_mysqlrouter_user(
if there is an issue creating and configuring the mysqlrouter user
"""
try:
primary_address = self.get_cluster_primary_address()

escaped_mysqlrouter_user_attributes = json.dumps({"unit_name": unit_name}).replace(
'"', r"\""
)
# Using server_config_user as we are sure it has create user grants
create_mysqlrouter_user_commands = (
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')",
f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
f"session.run_sql(\"CREATE USER '{username}'@'{hostname}' IDENTIFIED BY '{password}' ATTRIBUTE '{escaped_mysqlrouter_user_attributes}';\")",
)

# Using server_config_user as we are sure it has create user grants
mysqlrouter_user_grant_commands = (
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')",
f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
f"session.run_sql(\"GRANT CREATE USER ON *.* TO '{username}'@'{hostname}' WITH GRANT OPTION;\")",
f"session.run_sql(\"GRANT SELECT, INSERT, UPDATE, DELETE, EXECUTE ON mysql_innodb_cluster_metadata.* TO '{username}'@'{hostname}';\")",
f"session.run_sql(\"GRANT SELECT ON mysql.user TO '{username}'@'{hostname}';\")",
Expand Down Expand Up @@ -525,26 +528,28 @@ def create_application_database_and_scoped_user(
if unit_name is not None:
attributes["unit_name"] = unit_name
try:
primary_address = self.get_cluster_primary_address()

# Using server_config_user as we are sure it has create database grants
connect_command = (
f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
)
create_database_commands = (
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')",
f'session.run_sql("CREATE DATABASE IF NOT EXISTS `{database_name}`;")',
)

escaped_user_attributes = json.dumps(attributes).replace('"', r"\"")
# Using server_config_user as we are sure it has create user grants
create_scoped_user_commands = (
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')",
f"session.run_sql(\"CREATE USER `{username}`@`{hostname}` IDENTIFIED BY '{password}' ATTRIBUTE '{escaped_user_attributes}';\")",
f'session.run_sql("GRANT USAGE ON *.* TO `{username}`@`{hostname}`;")',
f'session.run_sql("GRANT ALL PRIVILEGES ON `{database_name}`.* TO `{username}`@`{hostname}`;")',
)

if create_database:
self._run_mysqlsh_script("\n".join(create_database_commands))
self._run_mysqlsh_script("\n".join(create_scoped_user_commands))
commands = connect_command + create_database_commands + create_scoped_user_commands
else:
commands = connect_command + create_scoped_user_commands

self._run_mysqlsh_script("\n".join(commands))
except MySQLClientError as e:
logger.exception(
f"Failed to create application database {database_name} and scoped user {username}@{hostname}",
Expand Down Expand Up @@ -603,12 +608,9 @@ def delete_users_for_unit(self, unit_name: str) -> None:
Raises:
MySQLDeleteUsersForUnitError if there is an error deleting users for the unit
"""
primary_address = self.get_cluster_primary_address()
if not primary_address:
raise MySQLDeleteUsersForUnitError("Unable to query cluster primary address")
# Using server_config_user as we are sure it has drop user grants
drop_users_command = [
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')",
f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
]
drop_users_command.extend(
self._get_statements_to_delete_users_with_attribute("unit_name", f"'{unit_name}'")
Expand All @@ -629,11 +631,8 @@ def delete_users_for_relation(self, relation_id: int) -> None:
MySQLDeleteUsersForRelationError if there is an error deleting users for the relation
"""
user = f"relation-{str(relation_id)}"
primary_address = self.get_cluster_primary_address()
if not primary_address:
raise MySQLDeleteUsersForRelationError("Unable to query cluster primary address")
drop_users_command = [
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')",
f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
f"session.run_sql(\"DROP USER IF EXISTS '{user}'@'%';\")",
]
# If the relation is with a MySQL Router charm application, delete any users
Expand All @@ -649,11 +648,8 @@ def delete_users_for_relation(self, relation_id: int) -> None:

def delete_user(self, username: str) -> None:
"""Delete user."""
primary_address = self.get_cluster_primary_address()
if not primary_address:
raise MySQLDeleteUserError("Unable to query cluster primary address")
drop_user_command = [
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{primary_address}')",
f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
f"session.run_sql(\"DROP USER `{username}`@'%'\")",
]
try:
Expand All @@ -664,11 +660,8 @@ def delete_user(self, username: str) -> None:

def remove_router_from_cluster_metadata(self, router_id: str) -> None:
"""Remove MySQL Router from InnoDB Cluster metadata."""
primary_address = self.get_cluster_primary_address()
if not primary_address:
raise MySQLRemoveRouterFromMetadataError("Unable to query cluster primary address")
command = [
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{primary_address}')",
f"shell.connect_to_primary('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')",
"cluster = dba.get_cluster()",
f'cluster.remove_router_metadata("{router_id}")',
]
Expand Down Expand Up @@ -746,8 +739,12 @@ def initialize_juju_units_operations_table(self) -> None:
initializing the juju_units_operations table
"""
initialize_table_commands = (
"CREATE TABLE IF NOT EXISTS mysql.juju_units_operations (task varchar(20), executor varchar(20), status varchar(20), primary key(task))",
f"INSERT INTO mysql.juju_units_operations values ('{UNIT_TEARDOWN_LOCKNAME}', '', 'not-started') ON DUPLICATE KEY UPDATE executor = '', status = 'not-started'",
"CREATE TABLE IF NOT EXISTS mysql.juju_units_operations (task varchar(20), executor "
"varchar(20), status varchar(20), primary key(task))",
f"INSERT INTO mysql.juju_units_operations values ('{UNIT_TEARDOWN_LOCKNAME}', '', "
"'not-started') ON DUPLICATE KEY UPDATE executor = '', status = 'not-started'",
f"INSERT INTO mysql.juju_units_operations values ('{UNIT_ADD_LOCKNAME}', '', "
"'not-started') ON DUPLICATE KEY UPDATE executor = '', status = 'not-started'",
)

try:
Expand Down Expand Up @@ -788,12 +785,18 @@ def add_instance_to_cluster(
"label": instance_unit_label,
}

if not self._acquire_lock(
from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME
):
raise MySQLLockAcquisitionError("Lock not acquired")

connect_commands = (
(
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}"
f"@{from_instance or self.instance_address}')"
),
f"cluster = dba.get_cluster('{self.cluster_name}')",
"shell.options['dba.restartWaitTimeout'] = 3600",
)

for recovery_method in ["auto", "clone"]:
Expand All @@ -816,11 +819,19 @@ def add_instance_to_cluster(
f"Failed to add instance {instance_address} to cluster {self.cluster_name} on {self.instance_address}",
exc_info=e,
)
self._release_lock(
from_instance or self.instance_address,
instance_unit_label,
UNIT_ADD_LOCKNAME,
)
raise MySQLAddInstanceToClusterError(e.message)

logger.debug(
f"Failed to add instance {instance_address} to cluster {self.cluster_name} with recovery method 'auto'. Trying method 'clone'"
)
self._release_lock(
from_instance or self.instance_address, instance_unit_label, UNIT_ADD_LOCKNAME
)

def is_instance_configured_for_innodb(
self, instance_address: str, instance_unit_label: str
Expand Down Expand Up @@ -854,6 +865,34 @@ def is_instance_configured_for_innodb(
)
return False

def are_locks_acquired(self, from_instance: Optional[str] = None) -> bool:
"""Report if any topology change is being executed.
Query the mysql.juju_units_operations table for any
in-progress lock for either unit add or removal.
Args:
from_instance: member instance to run the command from (fallback to current one)
"""
commands = (
(
f"shell.connect('{self.server_config_user}:{self.server_config_password}"
f"@{from_instance or self.instance_address}')"
),
"result = session.run_sql(\"SELECT COUNT(*) FROM mysql.juju_units_operations WHERE status='in-progress';\")",
"print(f'<LOCKS>{result.fetch_one()[0]}</LOCKS>')",
)
try:
output = self._run_mysqlsh_script("\n".join(commands))
except MySQLClientError:
# log error and fallback to assuming topology is changing
logger.exception("Failed to get locks count")
return True

matches = re.search(r"<LOCKS>(\d)</LOCKS>", output)

return int(matches.group(1)) > 0 if matches else False

def rescan_cluster(
self,
from_instance: Optional[str] = None,
Expand Down Expand Up @@ -936,7 +975,30 @@ def get_cluster_status(self) -> Optional[dict]:
output_dict = json.loads(output.lower())
return output_dict
except MySQLClientError:
logger.exception(f"Failed to get cluster status for {self.cluster_name}")
logger.error(f"Failed to get cluster status for {self.cluster_name}")

def get_cluster_node_count(self, from_instance: Optional[str] = None) -> int:
"""Retrieve current count of cluster nodes.
Returns:
Amount of cluster nodes.
"""
size_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}"
f"@{from_instance or self.instance_address}')",
'result = session.run_sql("SELECT COUNT(*) FROM performance_schema.replication_group_members")',
'print(f"<NODES>{result.fetch_one()[0]}</NODES>")',
)

try:
output = self._run_mysqlsh_script("\n".join(size_commands))
except MySQLClientError as e:
logger.warning("Failed to get node count", exc_info=e)
return 0

matches = re.search(r"<NODES>(\d)</NODES>", output)

return int(matches.group(1)) if matches else 0

def get_cluster_endpoints(self, get_ips: bool = True) -> Tuple[str, str, str]:
"""Use get_cluster_status to return endpoints tuple.
Expand Down Expand Up @@ -1098,7 +1160,11 @@ def _acquire_lock(self, primary_address: str, unit_label: str, lock_name: str) -
"print(f'<ACQUIRED_LOCK>{acquired_lock}</ACQUIRED_LOCK>')",
)

output = self._run_mysqlsh_script("\n".join(acquire_lock_commands))
try:
output = self._run_mysqlsh_script("\n".join(acquire_lock_commands))
except MySQLClientError:
logger.debug("Failed to acquire lock")
return False
matches = re.search(r"<ACQUIRED_LOCK>(\d)</ACQUIRED_LOCK>", output)
if not matches:
return False
Expand Down Expand Up @@ -1171,9 +1237,8 @@ def get_cluster_primary_address(self, connect_instance_address: str = None) -> O
logger.debug(f"Getting cluster primary member's address from {connect_instance_address}")

get_cluster_primary_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance_address}')",
f"cluster = dba.get_cluster('{self.cluster_name}')",
"primary_address = sorted([cluster_member['address'] for cluster_member in cluster.status()['defaultReplicaSet']['topology'].values() if cluster_member['mode'] == 'R/W'])[0]",
f"shell.connect_to_primary('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance_address}')",
"primary_address = shell.parse_uri(session.uri)['host']",
"print(f'<PRIMARY_ADDRESS>{primary_address}</PRIMARY_ADDRESS>')",
)

Expand Down Expand Up @@ -1256,12 +1321,8 @@ def grant_privileges_to_user(
Raises:
MySQLGrantPrivilegesToUserError if there is an issue granting privileges to a user
"""
cluster_primary = self.get_cluster_primary_address()
if not cluster_primary:
raise MySQLGrantPrivilegesToUserError("Failed to get cluster primary address")

grant_privileges_commands = (
f"shell.connect('{self.server_config_user}:{self.server_config_password}@{cluster_primary}')",
f"shell.connect_to_primary('{self.server_config_user}:{self.server_config_password}@{self.instance_address}')",
f"session.run_sql(\"GRANT {', '.join(privileges)} ON *.* TO '{username}'@'{hostname}'{' WITH GRANT OPTION' if with_grant_option else ''}\")",
)

Expand Down
Loading

0 comments on commit 1b40dfd

Please sign in to comment.