Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPE-2040 scale down refactor #249

Merged
merged 6 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 22 additions & 35 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
from mysql_k8s_helpers import (
MySQL,
MySQLCreateCustomConfigFileError,
MySQLForceRemoveUnitFromClusterError,
MySQLGetInnoDBBufferPoolParametersError,
MySQLInitialiseMySQLDError,
)
Expand All @@ -96,6 +95,9 @@ def __init__(self, *args):
self.framework.observe(self.on.leader_elected, self._on_leader_elected)
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.update_status, self._on_update_status)
self.framework.observe(
self.on.database_storage_detaching, self._on_database_storage_detaching
)

self.framework.observe(self.on[PEER].relation_joined, self._on_peer_relation_joined)
self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed)
Expand Down Expand Up @@ -395,37 +397,6 @@ def _join_unit_to_cluster(self) -> None:
self.unit.status = WaitingStatus("waiting to join the cluster")
logger.debug("waiting: failed to acquire lock when adding instance to cluster")

def _remove_scaled_down_units(self) -> None:
"""Remove scaled down units from the cluster."""
current_units = 1 + len(self.peers.units)
cluster_status = self._mysql.get_cluster_status()
if not cluster_status:
self.unit.status = BlockedStatus("Failed to get cluster status")
return

try:
addresses_of_units_to_remove = [
member["address"]
for unit_name, member in cluster_status["defaultreplicaset"]["topology"].items()
if int(unit_name.split("-")[-1]) >= current_units
]
except ValueError:
# exception can occur if unit is not yet labeled
return

if not addresses_of_units_to_remove:
return

self.unit.status = MaintenanceStatus("Removing scaled down units from cluster")

for unit_address in addresses_of_units_to_remove:
try:
self._mysql.force_remove_unit_from_cluster(unit_address)
paulomach marked this conversation as resolved.
Show resolved Hide resolved
except MySQLForceRemoveUnitFromClusterError:
self.unit.status = BlockedStatus("Failed to remove scaled down unit from cluster")
return
self.unit.status = ActiveStatus(self.active_status_message)

def _reconcile_pebble_layer(self, container: Container) -> None:
"""Reconcile pebble layer."""
current_layer = container.get_plan()
Expand Down Expand Up @@ -701,9 +672,6 @@ def _on_update_status(self, _: Optional[UpdateStatusEvent]) -> None:
if nodes > 0:
self.app_peer_data["units-added-to-cluster"] = str(nodes)

# Check if there are any scaled down units that need to be removed from the cluster
self._remove_scaled_down_units()

try:
primary_address = self._mysql.get_cluster_primary_address()
except MySQLGetClusterPrimaryAddressError:
Expand All @@ -727,6 +695,25 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None:
if self._is_unit_waiting_to_join_cluster():
self._join_unit_to_cluster()

def _on_database_storage_detaching(self, _) -> None:
"""Handle the database storage detaching event."""
# Only executes if the unit was initialised
if not self.unit_peer_data.get("unit-initialized"):
return

unit_label = self.unit.name.replace("/", "-")

# No need to remove the instance from the cluster if it is not a member of the cluster
if not self._mysql.is_instance_in_cluster(unit_label):
return

# The following operation uses locks to ensure that only one instance is removed
# from the cluster at a time (to avoid split-brain or lack of majority issues)
self._mysql.remove_instance(unit_label)

# Inform other hooks of current status
self.unit_peer_data["unit-status"] = "removing"

# =========================================================================
# Charm action handlers
# =========================================================================
Expand Down
51 changes: 0 additions & 51 deletions src/mysql_k8s_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ class MySQLDeleteUsersWithLabelError(Error):
"""Exception raised when there is an issue deleting users with a label."""


class MySQLForceRemoveUnitFromClusterError(Error):
"""Exception raised when there is an issue force removing a unit from the cluster."""


class MySQLWaitUntilUnitRemovedFromClusterError(Error):
"""Exception raised when there is an issue checking if a unit is removed from the cluster."""

Expand Down Expand Up @@ -423,53 +419,6 @@ def _wait_until_unit_removed_from_cluster(self, unit_address: str) -> None:
if unit_address in members_in_cluster:
raise MySQLWaitUntilUnitRemovedFromClusterError("Remove member still in cluster")

def force_remove_unit_from_cluster(self, unit_address: str) -> None:
"""Force removes the provided unit from the cluster.

Args:
unit_address: The address of unit to force remove from cluster

Raises:
MySQLForceRemoveUnitFromClusterError - if there was an issue force
removing the unit from the cluster
"""
cluster_status = self.get_cluster_status()
if not cluster_status:
raise MySQLForceRemoveUnitFromClusterError()

remove_instance_options = {
"force": "true",
}
remove_instance_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')",
f"cluster = dba.get_cluster('{self.cluster_name}')",
f"cluster.remove_instance('{unit_address}', {json.dumps(remove_instance_options)})",
)

try:
if cluster_status["defaultreplicaset"]["status"] == "no_quorum":
logger.warning("Cluster has no quorum. Forcing quorum using this instance.")

force_quorum_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')",
f"cluster = dba.get_cluster('{self.cluster_name}')",
f"cluster.force_quorum_using_partition_of('{self.cluster_admin_user}@{self.instance_address}', '{self.cluster_admin_password}')",
)

self._run_mysqlsh_script("\n".join(force_quorum_commands))

self._run_mysqlsh_script("\n".join(remove_instance_commands))

self._wait_until_unit_removed_from_cluster(unit_address)
except (
MySQLClientError,
MySQLWaitUntilUnitRemovedFromClusterError,
) as e:
logger.exception(
f"Failed to force remove instance {unit_address} from cluster", exc_info=e
)
raise MySQLForceRemoveUnitFromClusterError(e.message)

def create_database(self, database_name: str) -> None:
"""Creates a database.

Expand Down
6 changes: 6 additions & 0 deletions src/relations/mysql_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ def _on_database_broken(self, event: RelationBrokenEvent) -> None:
# run once by the leader
return

if self.charm.unit_peer_data.get("unit-status", None) == "removing":
# safeguard against relation broken being triggered for
# a unit being torn down (instead of un-related). See:
# https://bugs.launchpad.net/juju/+bug/1979811
return

if len(self.model.relations[DB_RELATION_NAME]) == 1:
# remove kubernetes service when last relation is removed
self.charm.k8s_helpers.delete_endpoint_services(["primary", "replicas"])
Expand Down
9 changes: 2 additions & 7 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,15 @@ async def test_scale_up_and_down(ops_test: OpsTest) -> None:
]
assert len(online_member_addresses) == 5

logger.info("Scale down to one unit")
await scale_application(ops_test, APP_NAME, 1, wait=False)

await ops_test.model.block_until(
lambda: len(ops_test.model.applications[APP_NAME].units) == 1
and ops_test.model.applications[APP_NAME].units[0].workload_status
in ("maintenance", "error", "blocked")
)
assert ops_test.model.applications[APP_NAME].units[0].workload_status == "maintenance"

await ops_test.model.wait_for_idle(
apps=[APP_NAME],
status="active",
raise_on_blocked=True,
timeout=TIMEOUT,
wait_for_exact_units=1,
)

random_unit = ops_test.model.applications[APP_NAME].units[0]
Expand Down
83 changes: 1 addition & 82 deletions tests/unit/test_mysql_k8s_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import json
import unittest
from unittest.mock import MagicMock, call, patch
from unittest.mock import MagicMock, patch

import tenacity
from charms.mysql.v0.mysql import MySQLClientError, MySQLConfigureMySQLUsersError
Expand All @@ -17,7 +17,6 @@
MySQLCreateUserError,
MySQLDeleteUsersWithLabelError,
MySQLEscalateUserPrivilegesError,
MySQLForceRemoveUnitFromClusterError,
MySQLInitialiseMySQLDError,
MySQLWaitUntilUnitRemovedFromClusterError,
)
Expand Down Expand Up @@ -337,86 +336,6 @@ def test_wait_until_unit_removed_from_cluster_exception(self, _get_cluster_statu
with self.assertRaises(MySQLWaitUntilUnitRemovedFromClusterError):
self.mysql._wait_until_unit_removed_from_cluster("mysql-0.mysql-endpoints")

@patch("mysql_k8s_helpers.MySQL.get_cluster_status", return_value=GET_CLUSTER_STATUS_RETURN)
@patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script")
@patch("mysql_k8s_helpers.MySQL._wait_until_unit_removed_from_cluster")
def test_force_remove_unit_from_cluster(
self, _wait_until_unit_removed_from_cluster, _run_mysqlsh_script, _get_cluster_status
):
"""Test the successful execution of force_remove_unit_from_cluster."""
_expected_remove_instance_commands = "\n".join(
(
"shell.connect('clusteradmin:clusteradminpassword@127.0.0.1')",
"cluster = dba.get_cluster('test_cluster')",
'cluster.remove_instance(\'1.2.3.4\', {"force": "true"})',
)
)

_expected_force_quorum_commands = "\n".join(
(
"shell.connect('clusteradmin:clusteradminpassword@127.0.0.1')",
"cluster = dba.get_cluster('test_cluster')",
"cluster.force_quorum_using_partition_of('clusteradmin@127.0.0.1', 'clusteradminpassword')",
)
)

self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 2)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 1)
self.assertEqual(
sorted(_run_mysqlsh_script.mock_calls),
sorted(
[
call(_expected_remove_instance_commands),
call(_expected_force_quorum_commands),
]
),
)

@patch("mysql_k8s_helpers.MySQL.get_cluster_status", return_value=GET_CLUSTER_STATUS_RETURN)
@patch("mysql_k8s_helpers.MySQL._run_mysqlsh_script")
@patch("mysql_k8s_helpers.MySQL._wait_until_unit_removed_from_cluster")
def test_force_remove_unit_from_cluster_exception(
self, _wait_until_unit_removed_from_cluster, _run_mysqlsh_script, _get_cluster_status
):
"""Test exceptions raised when executing force_remove_unit_from_cluster."""
_get_cluster_status.return_value = None

with self.assertRaises(MySQLForceRemoveUnitFromClusterError):
self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 0)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 0)

_get_cluster_status.reset_mock()
_get_cluster_status.return_value = GET_CLUSTER_STATUS_RETURN
_run_mysqlsh_script.side_effect = MySQLClientError("Mock error")

with self.assertRaises(MySQLForceRemoveUnitFromClusterError):
self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 1)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 0)

_get_cluster_status.reset_mock()
_get_cluster_status.return_value = GET_CLUSTER_STATUS_RETURN
_run_mysqlsh_script.reset_mock()
_run_mysqlsh_script.side_effect = None
_wait_until_unit_removed_from_cluster.side_effect = (
MySQLWaitUntilUnitRemovedFromClusterError("Mock error")
)

with self.assertRaises(MySQLForceRemoveUnitFromClusterError):
self.mysql.force_remove_unit_from_cluster("1.2.3.4")

self.assertEqual(_get_cluster_status.call_count, 1)
self.assertEqual(_run_mysqlsh_script.call_count, 2)
self.assertEqual(_wait_until_unit_removed_from_cluster.call_count, 1)

@patch("ops.model.Container")
def test_safe_stop_mysqld_safe(self, _container):
"""Test the successful execution of safe_stop_mysqld_safe."""
Expand Down