Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5827] Set all nodes to synchronous replicas #784

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f50bb2c
Update patroni configuration
dragomirp Nov 20, 2024
235e19d
Update test assertion
dragomirp Nov 20, 2024
30a7f56
Copy update_synchronous_node_count from VM
dragomirp Nov 21, 2024
c6339ce
Add unit test
dragomirp Nov 21, 2024
9bb565e
Set sync node count during upgrade
dragomirp Nov 21, 2024
6e60993
Fix tls test
dragomirp Nov 21, 2024
025bfb2
Switchover primary
dragomirp Nov 21, 2024
0aa9850
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 21, 2024
f88c956
Add different helper to get leader
dragomirp Nov 21, 2024
39817f6
Add config boilerplate
dragomirp Nov 29, 2024
2d5ab35
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 29, 2024
5752281
Merge branch 'dpe-5827-all-sync' into dpe-5827-all-sync-config
dragomirp Nov 29, 2024
43412d1
Use config value when setting sync node count
dragomirp Nov 29, 2024
0b1c289
Escape tuple
dragomirp Nov 29, 2024
707a014
Add policy values
dragomirp Dec 2, 2024
562dad5
Add integration test
dragomirp Dec 2, 2024
048e720
Fix casting
dragomirp Dec 2, 2024
1f65187
Fix test
dragomirp Dec 2, 2024
88541f1
Update to spec
dragomirp Dec 3, 2024
95c80de
Bump retry timout
dragomirp Dec 3, 2024
a74eae1
Switch to planned units
dragomirp Dec 3, 2024
b3e9684
Merge branch 'main' into dpe-5827-all-sync
dragomirp Dec 5, 2024
a5f8f2a
Merge branch 'main' into dpe-5827-all-sync
dragomirp Dec 19, 2024
0a5916b
Fix generator
dragomirp Dec 19, 2024
ac006d1
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 8, 2025
26c536c
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 8, 2025
91d533b
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 15, 2025
372048d
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 21, 2025
b63c5d1
Merge branch 'main' into dpe-5827-all-sync
dragomirp Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,24 @@ def get_unit_ip(self, unit: Unit) -> Optional[str]:
else:
return None

def updated_synchronous_node_count(self, num_units: int | None = None) -> bool:
"""Tries to update synchronous_node_count configuration and reports the result."""
try:
self._patroni.update_synchronous_node_count(num_units)
return True
except RetryError:
logger.debug("Unable to set synchronous_node_count")
return False

def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""The leader removes the departing units from the list of cluster members."""
# Allow leader to update endpoints if it isn't leaving.
if not self.unit.is_leader() or event.departing_unit == self.unit:
return

if "cluster_initialised" not in self._peers.data[self.app]:
if "cluster_initialised" not in self._peers.data[
self.app
] or not self.updated_synchronous_node_count(self.app.planned_units()):
logger.debug(
"Deferring on_peer_relation_departed: Cluster must be initialized before members can leave"
)
Expand Down Expand Up @@ -774,6 +785,7 @@ def _add_members(self, event) -> None:
for member in self._hosts - self._patroni.cluster_members:
logger.debug("Adding %s to cluster", member)
self.add_cluster_member(member)
self._patroni.update_synchronous_node_count()
except NotReadyError:
logger.info("Deferring reconfigure: another member doing sync right now")
event.defer()
Expand Down
24 changes: 23 additions & 1 deletion src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class SwitchoverFailedError(Exception):
"""Raised when a switchover failed for some reason."""


class UpdateSyncNodeCountError(Exception):
"""Raised when updating synchronous_node_count failed for some reason."""


class Patroni:
"""This class handles the communication with Patroni API and configuration files."""

Expand Down Expand Up @@ -125,6 +129,24 @@ def _get_alternative_patroni_url(
url = self._patroni_url
return url

def update_synchronous_node_count(self, units: int | None = None) -> None:
"""Update synchronous_node_count."""
if units is None:
units = self._members_count
# Try to update synchronous_node_count.
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
r = requests.patch(
f"{self._patroni_url}/config",
json={"synchronous_node_count": units - 1},
verify=self._verify,
auth=self._patroni_auth,
)

# Check whether the update was unsuccessful.
if r.status_code != 200:
raise UpdateSyncNodeCountError(f"received {r.status_code}")

def get_primary(self, unit_name_pattern=False, alternative_endpoints: List[str] = None) -> str:
"""Get primary instance.

Expand Down Expand Up @@ -489,7 +511,7 @@ def render_patroni_yml_file(
restore_to_latest=restore_to_latest,
stanza=stanza,
restore_stanza=restore_stanza,
minority_count=self._members_count // 2,
synchronous_node_count=self._members_count - 1,
version=self.rock_postgresql_version.split(".")[0],
pg_parameters=parameters,
primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(),
Expand Down
1 change: 1 addition & 0 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _on_upgrade_changed(self, event) -> None:
return

self.charm.update_config()
self.charm.updated_synchronous_node_count()

def _on_upgrade_charm_check_legacy(self, event: UpgradeCharmEvent) -> None:
if not self.peer_relation:
Expand Down
2 changes: 1 addition & 1 deletion templates/patroni.yml.j2
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
bootstrap:
dcs:
synchronous_mode: true
synchronous_node_count: {{ minority_count }}
synchronous_node_count: {{ synchronous_node_count }}
postgresql:
use_pg_rewind: true
remove_data_directory_on_rewind_failure: true
Expand Down
18 changes: 18 additions & 0 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,24 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Op
return parameter_value


async def get_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Args:
model: the model instance.
application_name: the name of the application to get the value for.

Returns:
the name of the standby leader.
"""
status = await model.get_status()
first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "leader":
return member["name"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_primary from the main helpers is not model aware.



async def get_standby_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Expand Down
16 changes: 8 additions & 8 deletions tests/integration/ha_tests/test_async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from .helpers import (
are_writes_increasing,
check_writes,
get_leader,
get_standby_leader,
get_sync_standby,
start_continuous_writes,
)

Expand Down Expand Up @@ -416,11 +416,11 @@ async def test_async_replication_failover_in_main_cluster(
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"Sync-standby: {sync_standby}")
logger.info("deleting the sync-standby pod")
primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"Primary: {primary}")
logger.info("deleting the primary pod")
client = Client(namespace=first_model.info.name)
client.delete(Pod, name=sync_standby.replace("/", "-"))
client.delete(Pod, name=primary.replace("/", "-"))

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
Expand All @@ -433,9 +433,9 @@ async def test_async_replication_failover_in_main_cluster(
)

# Check that the sync-standby unit is not the same as before.
new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_sync_standby}")
assert new_sync_standby != sync_standby, "Sync-standby is the same as before"
Comment on lines -432 to -434
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync standby will not change, since all the replicas are sync standbys.

new_primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_primary}")
assert new_primary != primary, "Sync-standby is the same as before"

logger.info("Ensure continuous_writes after the crashed unit")
await are_writes_increasing(ops_test)
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,15 +769,14 @@ async def switchover(
)
assert response.status_code == 200, f"Switchover status code is {response.status_code}"
app_name = current_primary.split("/")[0]
minority_count = len(ops_test.model.applications[app_name].units) // 2
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
with attempt:
response = requests.get(f"http://{primary_ip}:8008/cluster")
assert response.status_code == 200
standbys = len([
member for member in response.json()["members"] if member["role"] == "sync_standby"
])
assert standbys >= minority_count
assert standbys == len(ops_test.model.applications[app_name].units) - 1


async def wait_for_idle_on_blocked(
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def test_tls(ops_test: OpsTest) -> None:
patroni_password = await get_password(ops_test, "patroni")
cluster_info = requests.get(f"https://{primary_address}:8008/cluster", verify=False)
for member in cluster_info.json()["members"]:
if member["role"] == "replica":
if member["role"] != "leader":
replica = "/".join(member["name"].rsplit("-", 1))

# Check if TLS enabled for replication
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ def test_on_peer_relation_departed(harness):
"charm.PostgresqlOperatorCharm._get_endpoints_to_remove", return_value=sentinel.units
) as _get_endpoints_to_remove,
patch("charm.PostgresqlOperatorCharm._remove_from_endpoints") as _remove_from_endpoints,
patch("charm.PostgresqlOperatorCharm.updated_synchronous_node_count"),
):
# Early exit if not leader
event = Mock()
Expand Down
29 changes: 27 additions & 2 deletions tests/unit/test_patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def test_render_patroni_yml_file(harness, patroni):
replication_password=patroni._replication_password,
rewind_user=REWIND_USER,
rewind_password=patroni._rewind_password,
minority_count=patroni._members_count // 2,
synchronous_node_count=patroni._members_count - 1,
version="14",
patroni_password=patroni._patroni_password,
)
Expand Down Expand Up @@ -251,7 +251,7 @@ def test_render_patroni_yml_file(harness, patroni):
replication_password=patroni._replication_password,
rewind_user=REWIND_USER,
rewind_password=patroni._rewind_password,
minority_count=patroni._members_count // 2,
synchronous_node_count=patroni._members_count - 1,
version="14",
patroni_password=patroni._patroni_password,
)
Expand Down Expand Up @@ -452,3 +452,28 @@ def test_last_postgresql_logs(harness, patroni):
(root / "var" / "log" / "postgresql" / "postgresql.3.log").unlink()
(root / "var" / "log" / "postgresql").rmdir()
assert patroni.last_postgresql_logs() == ""


def test_update_synchronous_node_count(harness, patroni):
with (
patch("patroni.stop_after_delay", return_value=stop_after_delay(0)) as _wait_fixed,
patch("patroni.wait_fixed", return_value=wait_fixed(0)) as _wait_fixed,
patch("requests.patch") as _patch,
):
response = _patch.return_value
response.status_code = 200

patroni.update_synchronous_node_count()

_patch.assert_called_once_with(
"http://postgresql-k8s-0:8008/config",
json={"synchronous_node_count": 2},
verify=True,
auth=patroni._patroni_auth,
)

# Test when the request fails.
response.status_code = 500
with pytest.raises(RetryError):
patroni.update_synchronous_node_count()
assert False
4 changes: 4 additions & 0 deletions tests/unit/test_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ def test_on_upgrade_changed(harness):
with (
patch("charm.PostgresqlOperatorCharm.update_config") as _update_config,
patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started,
patch(
"charm.PostgresqlOperatorCharm.updated_synchronous_node_count"
) as _updated_synchronous_node_count,
):
harness.set_can_connect(POSTGRESQL_CONTAINER, True)
_member_started.return_value = False
Expand All @@ -168,6 +171,7 @@ def test_on_upgrade_changed(harness):
_member_started.return_value = True
harness.charm.on.upgrade_relation_changed.emit(relation)
_update_config.assert_called_once()
_updated_synchronous_node_count.assert_called_once_with()


def test_pre_upgrade_check(harness):
Expand Down
Loading