Skip to content

Commit

Permalink
DPE-2040 scale down refactor (#249)
Browse files Browse the repository at this point in the history
* self scale down using storage detached event

* remaining unit does not enter in maintenance anymore

* remove unused code

* bug reference
  • Loading branch information
paulomach authored Jun 30, 2023
1 parent c977411 commit 422e1b7
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 175 deletions.
57 changes: 22 additions & 35 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
from mysql_k8s_helpers import (
MySQL,
MySQLCreateCustomConfigFileError,
MySQLForceRemoveUnitFromClusterError,
MySQLGetInnoDBBufferPoolParametersError,
MySQLInitialiseMySQLDError,
)
Expand All @@ -96,6 +95,9 @@ def __init__(self, *args):
self.framework.observe(self.on.leader_elected, self._on_leader_elected)
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.update_status, self._on_update_status)
self.framework.observe(
self.on.database_storage_detaching, self._on_database_storage_detaching
)

self.framework.observe(self.on[PEER].relation_joined, self._on_peer_relation_joined)
self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed)
Expand Down Expand Up @@ -395,37 +397,6 @@ def _join_unit_to_cluster(self) -> None:
self.unit.status = WaitingStatus("waiting to join the cluster")
logger.debug("waiting: failed to acquire lock when adding instance to cluster")

def _remove_scaled_down_units(self) -> None:
"""Remove scaled down units from the cluster."""
current_units = 1 + len(self.peers.units)
cluster_status = self._mysql.get_cluster_status()
if not cluster_status:
self.unit.status = BlockedStatus("Failed to get cluster status")
return

try:
addresses_of_units_to_remove = [
member["address"]
for unit_name, member in cluster_status["defaultreplicaset"]["topology"].items()
if int(unit_name.split("-")[-1]) >= current_units
]
except ValueError:
# exception can occur if unit is not yet labeled
return

if not addresses_of_units_to_remove:
return

self.unit.status = MaintenanceStatus("Removing scaled down units from cluster")

for unit_address in addresses_of_units_to_remove:
try:
self._mysql.force_remove_unit_from_cluster(unit_address)
except MySQLForceRemoveUnitFromClusterError:
self.unit.status = BlockedStatus("Failed to remove scaled down unit from cluster")
return
self.unit.status = ActiveStatus(self.active_status_message)

def _reconcile_pebble_layer(self, container: Container) -> None:
"""Reconcile pebble layer."""
current_layer = container.get_plan()
Expand Down Expand Up @@ -701,9 +672,6 @@ def _on_update_status(self, _: Optional[UpdateStatusEvent]) -> None:
if nodes > 0:
self.app_peer_data["units-added-to-cluster"] = str(nodes)

# Check if there are any scaled down units that need to be removed from the cluster
self._remove_scaled_down_units()

try:
primary_address = self._mysql.get_cluster_primary_address()
except MySQLGetClusterPrimaryAddressError:
Expand All @@ -727,6 +695,25 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None:
if self._is_unit_waiting_to_join_cluster():
self._join_unit_to_cluster()

def _on_database_storage_detaching(self, _) -> None:
"""Handle the database storage detaching event."""
# Only executes if the unit was initialised
if not self.unit_peer_data.get("unit-initialized"):
return

unit_label = self.unit.name.replace("/", "-")

# No need to remove the instance from the cluster if it is not a member of the cluster
if not self._mysql.is_instance_in_cluster(unit_label):
return

# The following operation uses locks to ensure that only one instance is removed
# from the cluster at a time (to avoid split-brain or lack of majority issues)
self._mysql.remove_instance(unit_label)

# Inform other hooks of current status
self.unit_peer_data["unit-status"] = "removing"

# =========================================================================
# Charm action handlers
# =========================================================================
Expand Down
51 changes: 0 additions & 51 deletions src/mysql_k8s_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ class MySQLDeleteUsersWithLabelError(Error):
"""Exception raised when there is an issue deleting users with a label."""


class MySQLForceRemoveUnitFromClusterError(Error):
"""Exception raised when there is an issue force removing a unit from the cluster."""


class MySQLWaitUntilUnitRemovedFromClusterError(Error):
"""Exception raised when there is an issue checking if a unit is removed from the cluster."""

Expand Down Expand Up @@ -423,53 +419,6 @@ def _wait_until_unit_removed_from_cluster(self, unit_address: str) -> None:
if unit_address in members_in_cluster:
raise MySQLWaitUntilUnitRemovedFromClusterError("Remove member still in cluster")

def force_remove_unit_from_cluster(self, unit_address: str) -> None:
"""Force removes the provided unit from the cluster.
Args:
unit_address: The address of unit to force remove from cluster
Raises:
MySQLForceRemoveUnitFromClusterError - if there was an issue force
removing the unit from the cluster
"""
cluster_status = self.get_cluster_status()
if not cluster_status:
raise MySQLForceRemoveUnitFromClusterError()

remove_instance_options = {
"force": "true",
}
remove_instance_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')",
f"cluster = dba.get_cluster('{self.cluster_name}')",
f"cluster.remove_instance('{unit_address}', {json.dumps(remove_instance_options)})",
)

try:
if cluster_status["defaultreplicaset"]["status"] == "no_quorum":
logger.warning("Cluster has no quorum. Forcing quorum using this instance.")

force_quorum_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')",
f"cluster = dba.get_cluster('{self.cluster_name}')",
f"cluster.force_quorum_using_partition_of('{self.cluster_admin_user}@{self.instance_address}', '{self.cluster_admin_password}')",
)

self._run_mysqlsh_script("\n".join(force_quorum_commands))

self._run_mysqlsh_script("\n".join(remove_instance_commands))

self._wait_until_unit_removed_from_cluster(unit_address)
except (
MySQLClientError,
MySQLWaitUntilUnitRemovedFromClusterError,
) as e:
logger.exception(
f"Failed to force remove instance {unit_address} from cluster", exc_info=e
)
raise MySQLForceRemoveUnitFromClusterError(e.message)

def create_database(self, database_name: str) -> None:
"""Creates a database.
Expand Down
6 changes: 6 additions & 0 deletions src/relations/mysql_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ def _on_database_broken(self, event: RelationBrokenEvent) -> None:
# run once by the leader
return

if self.charm.unit_peer_data.get("unit-status", None) == "removing":
# safeguard against relation broken being triggered for
# a unit being torn down (instead of un-related). See:
# https://bugs.launchpad.net/juju/+bug/1979811
return

if len(self.model.relations[DB_RELATION_NAME]) == 1:
# remove kubernetes service when last relation is removed
self.charm.k8s_helpers.delete_endpoint_services(["primary", "replicas"])
Expand Down
9 changes: 2 additions & 7 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,15 @@ async def test_scale_up_and_down(ops_test: OpsTest) -> None:
]
assert len(online_member_addresses) == 5

logger.info("Scale down to one unit")
await scale_application(ops_test, APP_NAME, 1, wait=False)

await ops_test.model.block_until(
lambda: len(ops_test.model.applications[APP_NAME].units) == 1
and ops_test.model.applications[APP_NAME].units[0].workload_status
in ("maintenance", "error", "blocked")
)
assert ops_test.model.applications[APP_NAME].units[0].workload_status == "maintenance"

await ops_test.model.wait_for_idle(
apps=[APP_NAME],
status="active",
raise_on_blocked=True,
timeout=TIMEOUT,
wait_for_exact_units=1,
)

random_unit = ops_test.model.applications[APP_NAME].units[0]
Expand Down
83 changes: 1 addition & 82 deletions tests/unit/test_mysql_k8s_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import json
import unittest
from unittest.mock import MagicMock, call, patch
from unittest.mock import MagicMock, patch

import tenacity
from charms.mysql.v0.mysql import MySQLClientError, MySQLConfigureMySQLUsersError
Expand All @@ -17,7 +17,6 @@
MySQLCreateUserError,
MySQLDeleteUsersWithLabelError,
MySQLEscalateUserPrivilegesError,
MySQLForceRemoveUnitFromClusterError,
MySQLInitialiseMySQLDError,
MySQLWaitUntilUnitRemovedFromClusterError,
)
Expand Down Expand Up @@ -337,86 +336,6 @@ def test_wait_until_unit_removed_from_cluster_exception(self, _get_cluster_statu
with self.assertRaises(MySQLWaitUntilUnitRemovedFromClusterError):
self.mysql._wait_until_unit_removed_from_cluster("mysql-0.mysql-endpoints")

@patch("mysql_k8s_helpers.MySQL.get_cluster_status", return_value=GET_CLUSTER_STATUS_RETURN)
@patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script")
@patch("mysql_k8s_helpers.MySQL._wait_until_unit_removed_from_cluster")
def test_force_remove_unit_from_cluster(
self, _wait_until_unit_removed_from_cluster, _run_mysqlsh_script, _get_cluster_status
):
"""Test the successful execution of force_remove_unit_from_cluster."""
_expected_remove_instance_commands = "\n".join(
(
"shell.connect('clusteradmin:clusteradminpassword@127.0.0.1')",
"cluster = dba.get_cluster('test_cluster')",
'cluster.remove_instance(\'1.2.3.4\', {"force": "true"})',
)
)

_expected_force_quorum_commands = "\n".join(
(
"shell.connect('clusteradmin:clusteradminpassword@127.0.0.1')",
"cluster = dba.get_cluster('test_cluster')",
"cluster.force_quorum_using_partition_of('clusteradmin@127.0.0.1', 'clusteradminpassword')",
)
)

self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 2)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 1)
self.assertEqual(
sorted(_run_mysqlsh_script.mock_calls),
sorted(
[
call(_expected_remove_instance_commands),
call(_expected_force_quorum_commands),
]
),
)

@patch("mysql_k8s_helpers.MySQL.get_cluster_status", return_value=GET_CLUSTER_STATUS_RETURN)
@patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script")
@patch("mysql_k8s_helpers.MySQL._wait_until_unit_removed_from_cluster")
def test_force_remove_unit_from_cluster_exception(
self, _wait_until_unit_removed_from_cluster, _run_mysqlsh_script, _get_cluster_status
):
"""Test exceptions raised when executing force_remove_unit_from_cluster."""
_get_cluster_status.return_value = None

with self.assertRaises(MySQLForceRemoveUnitFromClusterError):
self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 0)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 0)

_get_cluster_status.reset_mock()
_get_cluster_status.return_value = GET_CLUSTER_STATUS_RETURN
_run_mysqlsh_script.side_effect = MySQLClientError("Mock error")

with self.assertRaises(MySQLForceRemoveUnitFromClusterError):
self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 1)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 0)

_get_cluster_status.reset_mock()
_get_cluster_status.return_value = GET_CLUSTER_STATUS_RETURN
_run_mysqlsh_script.reset_mock()
_run_mysqlsh_script.side_effect = None
_wait_until_unit_removed_from_cluster.side_effect = (
MySQLWaitUntilUnitRemovedFromClusterError("Mock error")
)

with self.assertRaises(MySQLForceRemoveUnitFromClusterError):
self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 2)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 1)

@patch("ops.model.Container")
def test_safe_stop_mysqld_safe(self, _container):
"""Test the successful execution of safe_stop_mysqld_safe."""
Expand Down

0 comments on commit 422e1b7

Please sign in to comment.