Skip to content

Commit

Permalink
[DPE-4118] Address drained units rejoining the cluster with a new PV (#…
Browse files Browse the repository at this point in the history
…433)

* Address drained units rejoining the cluster with a new PV

* WIP: Address pod eviction and pvc deletion during node drain

* Address PR feedback + add integration test for node drain

* Fix failing unit test

* Add missing kwarg used for get_cluster_status

* Fix various bugs related to retrieving cluster name

* Update data_interfacese charm lib + pull in PR version of mysql charm lib

* Pull in latest version of the mysql charm lib

* Adjustments based on the implementation of MySQL.cluster_metadata_exists

* Remove instance explicitly and rescan to clean stale user accounts

* Update the mysql lib to test latest changes in the vm charm

* Force remove instances during node drain

* Update all outdate charm libs
  • Loading branch information
shayancanonical committed Jul 23, 2024
1 parent ee7b04c commit d97adc2
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 69 deletions.
25 changes: 24 additions & 1 deletion lib/charms/data_platform_libs/v0/data_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 37
LIBPATCH = 38

PYDEPS = ["ops>=2.0.0"]

Expand Down Expand Up @@ -2606,6 +2606,14 @@ def set_version(self, relation_id: int, version: str) -> None:
"""
self.update_relation_data(relation_id, {"version": version})

def set_subordinated(self, relation_id: int) -> None:
"""Raises the subordinated flag in the application relation databag.
Args:
relation_id: the identifier for a particular relation.
"""
self.update_relation_data(relation_id, {"subordinated": "true"})


class DatabaseProviderEventHandlers(EventHandlers):
"""Provider-side of the database relation handlers."""
Expand Down Expand Up @@ -2842,6 +2850,21 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None:

def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the database relation has changed."""
is_subordinate = False
remote_unit_data = None
for key in event.relation.data.keys():
if isinstance(key, Unit) and not key.name.startswith(self.charm.app.name):
remote_unit_data = event.relation.data[key]
elif isinstance(key, Application) and key.name != self.charm.app.name:
is_subordinate = event.relation.data[key].get("subordinated") == "true"

if is_subordinate:
if not remote_unit_data:
return

if remote_unit_data.get("state") != "ready":
return

# Check which data has changed to emit customs events.
diff = self._diff(event)

Expand Down
15 changes: 1 addition & 14 deletions lib/charms/mysql/v0/async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
# The unique Charmhub library identifier, never change it
LIBID = "4de21f1a022c4e2c87ac8e672ec16f6a"
LIBAPI = 0
LIBPATCH = 4
LIBPATCH = 5

RELATION_OFFER = "replication-offer"
RELATION_CONSUMER = "replication"
Expand Down Expand Up @@ -248,8 +248,6 @@ def on_async_relation_broken(self, event: RelationBrokenEvent): # noqa: C901
"\tThe cluster can be recreated with the `recreate-cluster` action.\n"
"\tAlternatively the cluster can be rejoined to the cluster set."
)
# reset the cluster node count flag
del self._charm.app_peer_data["units-added-to-cluster"]
# set flag to persist removed from cluster-set state
self._charm.app_peer_data["removed-from-cluster-set"] = "true"

Expand Down Expand Up @@ -834,8 +832,6 @@ def _on_consumer_changed(self, event): # noqa: C901
self._charm.unit.status = MaintenanceStatus("Dissolving replica cluster")
logger.info("Dissolving replica cluster")
self._charm._mysql.dissolve_cluster()
# reset the cluster node count flag
del self._charm.app_peer_data["units-added-to-cluster"]
# reset force rejoin-secondaries flag
del self._charm.app_peer_data["rejoin-secondaries"]

Expand Down Expand Up @@ -869,11 +865,6 @@ def _on_consumer_changed(self, event): # noqa: C901
if cluster_set_domain_name := self._charm._mysql.get_cluster_set_name():
self._charm.app_peer_data["cluster-set-domain-name"] = cluster_set_domain_name

# set the number of units added to the cluster for a single unit replica cluster
# needed here since it will skip the `RECOVERING` state
if self._charm.app.planned_units() == 1:
self._charm.app_peer_data["units-added-to-cluster"] = "1"

self._charm._on_update_status(None)
elif state == States.RECOVERING:
# recovering cluster (copying data and/or joining units)
Expand All @@ -882,10 +873,6 @@ def _on_consumer_changed(self, event): # noqa: C901
"Waiting for recovery to complete on other units"
)
logger.debug("Awaiting other units to join the cluster")
# reset the number of units added to the cluster
# this will trigger secondaries to join the cluster
node_count = self._charm._mysql.get_cluster_node_count()
self._charm.app_peer_data["units-added-to-cluster"] = str(node_count)
# set state flags to allow secondaries to join the cluster
self._charm.unit_peer_data["member-state"] = "online"
self._charm.unit_peer_data["member-role"] = "primary"
Expand Down
36 changes: 23 additions & 13 deletions lib/charms/mysql/v0/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def wait_until_mysql_connection(self) -> None:
# Increment this major API version when introducing breaking changes
LIBAPI = 0

LIBPATCH = 62
LIBPATCH = 64

UNIT_TEARDOWN_LOCKNAME = "unit-teardown"
UNIT_ADD_LOCKNAME = "unit-add"
Expand Down Expand Up @@ -589,7 +589,6 @@ def create_cluster(self) -> None:
# rescan cluster for cleanup of unused
# recovery users
self._mysql.rescan_cluster()
self.app_peer_data["units-added-to-cluster"] = "1"

state, role = self._mysql.get_member_state()

Expand Down Expand Up @@ -1779,6 +1778,27 @@ def _get_host_ip(host: str) -> str:

return ",".join(rw_endpoints), ",".join(ro_endpoints), ",".join(no_endpoints)

def execute_remove_instance(
self, connect_instance: Optional[str] = None, force: bool = False
) -> None:
"""Execute the remove_instance() script with mysqlsh.
Args:
connect_instance: (optional) The instance from where to run the remove_instance()
force: (optional) Whether to force the removal of the instance
"""
remove_instance_options = {
"password": self.cluster_admin_password,
"force": "true" if force else "false",
}
remove_instance_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{connect_instance or self.instance_address}')",
f"cluster = dba.get_cluster('{self.cluster_name}')",
"cluster.remove_instance("
f"'{self.cluster_admin_user}@{self.instance_address}', {remove_instance_options})",
)
self._run_mysqlsh_script("\n".join(remove_instance_commands))

@retry(
retry=retry_if_exception_type(MySQLRemoveInstanceRetryError),
stop=stop_after_attempt(15),
Expand Down Expand Up @@ -1842,17 +1862,7 @@ def remove_instance(self, unit_label: str, lock_instance: Optional[str] = None)
)

# Just remove instance
remove_instance_options = {
"password": self.cluster_admin_password,
"force": "true",
}
remove_instance_commands = (
f"shell.connect('{self.cluster_admin_user}:{self.cluster_admin_password}@{self.instance_address}')",
f"cluster = dba.get_cluster('{self.cluster_name}')",
"cluster.remove_instance("
f"'{self.cluster_admin_user}@{self.instance_address}', {remove_instance_options})",
)
self._run_mysqlsh_script("\n".join(remove_instance_commands))
self.execute_remove_instance(force=True)
except MySQLClientError as e:
# In case of an error, raise an error and retry
logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion lib/charms/mysql/v0/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

LIBID = "eb73947deedd4380a3a90d527e0878eb"
LIBAPI = 0
LIBPATCH = 5
LIBPATCH = 6

SCOPE = "unit"

Expand Down
1 change: 0 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 36 additions & 25 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@
from charms.rolling_ops.v0.rollingops import RollingOpsManager
from charms.tempo_k8s.v1.charm_tracing import trace_charm
from charms.tempo_k8s.v2.tracing import TracingEndpointRequirer
from ops import EventBase, RelationBrokenEvent, RelationCreatedEvent, Unit
from ops import EventBase, RelationBrokenEvent, RelationCreatedEvent
from ops.charm import RelationChangedEvent, UpdateStatusEvent
from ops.main import main
from ops.model import ActiveStatus, BlockedStatus, Container, MaintenanceStatus, WaitingStatus
from ops.model import (
ActiveStatus,
BlockedStatus,
Container,
MaintenanceStatus,
Unit,
WaitingStatus,
)
from ops.pebble import Layer

from config import CharmConfig, MySQLConfig
Expand Down Expand Up @@ -189,7 +196,7 @@ def tracing_endpoint(self) -> Optional[str]:
def _mysql(self) -> MySQL:
"""Returns an instance of the MySQL object from mysql_k8s_helpers."""
return MySQL(
self._get_unit_fqdn(),
self.get_unit_address(),
self.app_peer_data["cluster-name"],
self.app_peer_data["cluster-set-domain-name"],
self.get_secret("app", ROOT_PASSWORD_KEY), # pyright: ignore [reportArgumentType]
Expand Down Expand Up @@ -252,11 +259,7 @@ def restart_peers(self) -> Optional[ops.model.Relation]:
@property
def unit_address(self) -> str:
"""Return the address of this unit."""
return self._get_unit_fqdn()

def get_unit_address(self, unit: Unit) -> str:
"""Return the address of a unit."""
return self._get_unit_fqdn(unit.name)
return self.get_unit_address()

def get_unit_hostname(self, unit_name: Optional[str] = None) -> str:
"""Get the hostname.localdomain for a unit.
Expand All @@ -272,17 +275,15 @@ def get_unit_hostname(self, unit_name: Optional[str] = None) -> str:
unit_name = unit_name or self.unit.name
return f"{unit_name.replace('/', '-')}.{self.app.name}-endpoints"

def _get_unit_fqdn(self, unit_name: Optional[str] = None) -> str:
"""Create a fqdn for a unit.
def get_unit_address(self, unit: Optional[Unit] = None) -> str:
"""Get fqdn/address for a unit.
Translate juju unit name to resolvable hostname.
Args:
unit_name: unit name
Returns:
A string representing the fqdn of the unit.
"""
return getfqdn(self.get_unit_hostname(unit_name))
if not unit:
unit = self.unit

return getfqdn(self.get_unit_hostname(unit.name))

def is_unit_busy(self) -> bool:
"""Returns whether the unit is busy."""
Expand All @@ -294,7 +295,7 @@ def _get_primary_from_online_peer(self) -> Optional[str]:
if self.peers.data[unit].get("member-state") == "online":
try:
return self._mysql.get_cluster_primary_address(
connect_instance_address=self._get_unit_fqdn(unit.name),
connect_instance_address=self.get_unit_address(unit),
)
except MySQLGetClusterPrimaryAddressError:
# try next unit
Expand Down Expand Up @@ -325,7 +326,7 @@ def join_unit_to_cluster(self) -> None:
Try to join the unit from the primary unit.
"""
instance_label = self.unit.name.replace("/", "-")
instance_address = self._get_unit_fqdn(self.unit.name)
instance_address = self.get_unit_address(self.unit)

if not self._mysql.is_instance_in_cluster(instance_label):
# Add new instance to the cluster
Expand Down Expand Up @@ -370,6 +371,21 @@ def join_unit_to_cluster(self) -> None:
# Stop GR for cases where the instance was previously part of the cluster
# harmless otherwise
self._mysql.stop_group_replication()

# If instance already in cluster, before adding instance to cluster,
# remove the instance from the cluster and call rescan_cluster()
# without adding/removing instances to clean up stale users
if (
instance_label
in self._mysql.get_cluster_status(from_instance=cluster_primary)[
"defaultreplicaset"
]["topology"].keys()
):
self._mysql.execute_remove_instance(
connect_instance=cluster_primary, force=True
)
self._mysql.rescan_cluster(from_instance=cluster_primary)

self._mysql.add_instance_to_cluster(
instance_address=instance_address,
instance_unit_label=instance_label,
Expand All @@ -385,7 +401,6 @@ def join_unit_to_cluster(self) -> None:
logger.debug("waiting: failed to acquire lock when adding instance to cluster")
return

# Update 'units-added-to-cluster' counter in the peer relation databag
self.unit_peer_data["member-state"] = "online"
self.unit.status = ActiveStatus(self.active_status_message)
logger.debug(f"Instance {instance_label} is cluster member")
Expand Down Expand Up @@ -669,7 +684,7 @@ def _on_mysql_pebble_ready(self, event) -> None:
# First run setup
self._configure_instance(container)

if not self.unit.is_leader():
if not self.unit.is_leader() or self.cluster_initialized:
# Non-leader units try to join cluster
self.unit.status = WaitingStatus("Waiting for instance to join the cluster")
self.unit_peer_data.update({"member-role": "secondary", "member-state": "waiting"})
Expand Down Expand Up @@ -793,10 +808,6 @@ def _on_update_status(self, _: Optional[UpdateStatusEvent]) -> None:

def _set_app_status(self) -> None:
"""Set the application status based on the cluster state."""
nodes = self._mysql.get_cluster_node_count()
if nodes > 0:
self.app_peer_data["units-added-to-cluster"] = str(nodes)

try:
primary_address = self._mysql.get_cluster_primary_address()
except MySQLGetClusterPrimaryAddressError:
Expand Down Expand Up @@ -838,7 +849,7 @@ def _on_database_storage_detaching(self, _) -> None:
logger.info("Switching primary to unit 0")
try:
self._mysql.set_cluster_primary(
new_primary_address=self._get_unit_fqdn(f"{self.app.name}/0")
new_primary_address=getfqdn(self.get_unit_hostname(f"{self.app.name}/0"))
)
except MySQLSetClusterPrimaryError:
logger.warning("Failed to switch primary to unit 0")
Expand Down
11 changes: 5 additions & 6 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import json
import logging
from socket import getfqdn
from typing import TYPE_CHECKING

from charms.data_platform_libs.v0.upgrade import (
Expand Down Expand Up @@ -174,12 +175,12 @@ def _pre_upgrade_prepare(self) -> None:
"""
if self.charm._mysql.get_primary_label() != f"{self.charm.app.name}-0":
# set the primary to the first unit for switchover mitigation
new_primary = self.charm._get_unit_fqdn(f"{self.charm.app.name}/0")
new_primary = getfqdn(self.charm.get_unit_hostname(f"{self.charm.app.name}/0"))
self.charm._mysql.set_cluster_primary(new_primary)

# set slow shutdown on all instances
for unit in self.app_units:
unit_address = self.charm._get_unit_fqdn(unit.name)
unit_address = self.charm.get_unit_address(unit)
self.charm._mysql.set_dynamic_variable(
variable="innodb_fast_shutdown", value="0", instance_address=unit_address
)
Expand Down Expand Up @@ -293,9 +294,7 @@ def _complete_upgrade(self):
if self.charm.unit_label == f"{self.charm.app.name}/1":
# penultimate unit, reset the primary for faster switchover
try:
self.charm._mysql.set_cluster_primary(
self.charm._get_unit_fqdn(self.charm.unit.name)
)
self.charm._mysql.set_cluster_primary(self.charm.get_unit_address(self.charm.unit))
except MySQLSetClusterPrimaryError:
logger.debug("Failed to set primary")

Expand All @@ -322,7 +321,7 @@ def _check_server_upgradeability(self) -> None:
if len(self.upgrade_stack or []) < self.charm.app.planned_units():
# check is done for 1st upgrading unit
return
instance = self.charm._get_unit_fqdn(f"{self.charm.app.name}/0")
instance = getfqdn(self.charm.get_unit_hostname(f"{self.charm.app.name}/0"))
self.charm._mysql.verify_server_upgradable(instance=instance)
logger.debug("MySQL server is upgradeable")

Expand Down
Loading

0 comments on commit d97adc2

Please sign in to comment.