Skip to content

Commit

Permalink
Enable backup on replicas and hide replica from PgBouncer while doing…
Browse files Browse the repository at this point in the history
… backup (#114)

* Initial code for backing up on replicas

* Add initial TLS settings on primary

* Adjust pgBackRest configuration to use standbys

* Implement working TLS server

* Fix TLS errors

* Fix stanza retrieval, restore test and unit test

* Enable Mattermost test

* Fix backup settings check

* Revert S3 settings

* Add TLS check

* Reorganize TLS server logic

* Fix multiple primaries situation

* Remove log calls

* Turn off connectivity only on replicas

* Use planned units when checking whether there are replicas

* Revert last change

* Improve TLS flag documentation

* Improve logic that controls backups on standbys
  • Loading branch information
marceloneppel authored Mar 24, 2023
1 parent 17ecbf4 commit ea781c1
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 60 deletions.
152 changes: 126 additions & 26 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,31 @@ def _are_backup_settings_ok(self) -> Tuple[bool, Optional[str]]:
if missing_parameters:
return False, f"Missing S3 parameters: {missing_parameters}"

if "stanza" not in self.charm._peers.data[self.charm.unit]:
return False, "Stanza was not initialised"

return True, None

def _can_unit_perform_backup(self) -> Tuple[bool, Optional[str]]:
"""Validates whether this unit can perform a backup."""
if self.charm.is_blocked:
return False, "Unit is in a blocking state"

if (
self.charm.unit.name == self.charm._patroni.get_primary(unit_name_pattern=True)
and self.charm.app.planned_units() > 1
):
tls_enabled = "tls" in self.charm.unit_peer_data

# Only enable backups on primary if there are replicas but TLS is not enabled.
if self.charm.is_primary and self.charm.app.planned_units() > 1 and tls_enabled:
return False, "Unit cannot perform backups as it is the cluster primary"

# Can create backups on replicas only if TLS is enabled (it's needed to enable
# pgBackRest to communicate with the primary to request that missing WAL files
# are pushed to the S3 repo before the backup action is triggered).
if not self.charm.is_primary and not tls_enabled:
return False, "Unit cannot perform backups as TLS is not enabled"

if not self.charm._patroni.member_started:
return False, "Unit cannot perform backups as it's not in running state"

if "stanza" not in self.charm.app_peer_data:
return False, "Stanza was not initialised"

return self._are_backup_settings_ok()

def _construct_endpoint(self, s3_parameters: Dict) -> str:
Expand Down Expand Up @@ -116,6 +122,11 @@ def _empty_data_files(self) -> None:
)
raise

def _change_connectivity_to_database(self, connectivity: bool) -> None:
"""Enable or disable the connectivity to the database."""
self.charm.unit_peer_data.update({"connectivity": "on" if connectivity else "off"})
self.charm.update_config()

def _execute_command(self, command: List[str]) -> Tuple[str, str]:
"""Execute a command in the workload container."""
return self.container.exec(
Expand Down Expand Up @@ -180,6 +191,9 @@ def _initialise_stanza(self) -> None:
located, how it will be backed up, archiving options, etc. (more info in
https://pgbackrest.org/user-guide.html#quickstart/configure-stanza).
"""
if not self.charm.unit.is_leader():
return

if self.charm.is_blocked:
logger.warning("couldn't initialize stanza due to a blocked status")
return
Expand All @@ -199,7 +213,7 @@ def _initialise_stanza(self) -> None:
return

# Store the stanza name to be used in configurations updates.
self.charm._peers.data[self.charm.unit].update({"stanza": self.charm.cluster_name})
self.charm.app_peer_data.update({"stanza": self.charm.cluster_name})

# Update the configuration to use pgBackRest as the archiving mechanism.
self.charm.update_config()
Expand All @@ -208,7 +222,8 @@ def _initialise_stanza(self) -> None:
# Check that the stanza is correctly configured.
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)):
with attempt:
self.charm._patroni.reload_patroni_configuration()
if self.charm._patroni.member_started:
self.charm._patroni.reload_patroni_configuration()
self._execute_command(
["pgbackrest", f"--stanza={self.charm.cluster_name}", "check"]
)
Expand All @@ -218,6 +233,32 @@ def _initialise_stanza(self) -> None:
self.charm.unit.status = BlockedStatus(
f"failed to initialize stanza with error {str(e)}"
)
return

return

@property
def _is_primary_pgbackrest_service_running(self) -> bool:
"""Returns whether the pgBackRest TLS server is running in the primary unit."""
try:
primary = self.charm._patroni.get_primary()
except (RetryError, ConnectionError) as e:
logger.error(f"failed to get primary with error {str(e)}")
return False

primary_endpoint = self.charm._get_hostname_from_unit(primary)

try:
self._execute_command(
["pgbackrest", "server-ping", "--io-timeout=10", primary_endpoint]
)
except ExecError as e:
logger.warning(
f"Failed to contact pgBackRest TLS server on {primary_endpoint} with error {str(e)}"
)
return False

return True

def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
"""Call the stanza initialization when the credentials or the connection info change."""
Expand All @@ -226,16 +267,14 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
event.defer()
return

s3_parameters, missing_parameters = self._retrieve_s3_parameters()
if missing_parameters:
logger.warning(
f"Cannot set pgBackRest configurations due to missing S3 parameters: {missing_parameters}"
)
if not self._render_pgbackrest_conf_file():
logger.debug("Cannot set pgBackRest configurations, missing configurations.")
return

self._render_pgbackrest_conf_file(s3_parameters)
self._initialise_stanza()

self.start_stop_pgbackrest_service()

def _on_create_backup_action(self, event) -> None:
"""Request that pgBackRest creates a backup."""
can_unit_perform_backup, validation_message = self._can_unit_perform_backup()
Expand Down Expand Up @@ -267,17 +306,26 @@ def _on_create_backup_action(self, event) -> None:
event.fail("Failed to upload metadata to provided S3")
return

if not self.charm.is_primary:
# Create a rule to mark the cluster as in a creating backup state and update
# the Patroni configuration.
self._change_connectivity_to_database(connectivity=False)

self.charm.unit.status = MaintenanceStatus("creating backup")

try:
self.charm.unit.status = MaintenanceStatus("creating backup")
stdout, stderr = self._execute_command(
[
"pgbackrest",
f"--stanza={self.charm.cluster_name}",
"--log-level-console=debug",
"--type=full",
"backup",
]
)
command = [
"pgbackrest",
f"--stanza={self.charm.cluster_name}",
"--log-level-console=debug",
"--type=full",
"backup",
]
if self.charm.is_primary:
# Force the backup to run in the primary if it's not possible to run it
# on the replicas (that happens when TLS is not enabled).
command.append("--no-backup-standby")
stdout, stderr = self._execute_command(command)
backup_id = self._list_backups_ids(show_failed=True)[-1]
except ExecError as e:
logger.exception(e)
Expand Down Expand Up @@ -329,6 +377,11 @@ def _on_create_backup_action(self, event) -> None:
else:
event.set_results({"backup-status": "backup created"})

if not self.charm.is_primary:
# Remove the rule the marks the cluster as in a creating backup state
# and update the Patroni configuration.
self._change_connectivity_to_database(connectivity=True)

self.charm.unit.status = ActiveStatus()

def _on_list_backups_action(self, event) -> None:
Expand Down Expand Up @@ -451,15 +504,31 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
event.fail(error_message)
return False

logger.info("Checking that this unit was already elected the leader unit")
if not self.charm.unit.is_leader():
error_message = "Unit cannot restore backup as it was not elected the leader unit yet"
logger.warning(error_message)
event.fail(error_message)
return False

return True

def _render_pgbackrest_conf_file(self, s3_parameters: Dict) -> None:
def _render_pgbackrest_conf_file(self) -> bool:
"""Render the pgBackRest configuration file."""
s3_parameters, missing_parameters = self._retrieve_s3_parameters()
if missing_parameters:
logger.warning(
f"Cannot set pgBackRest configurations due to missing S3 parameters: {missing_parameters}"
)
return False

# Open the template pgbackrest.conf file.
with open("templates/pgbackrest.conf.j2", "r") as file:
template = Template(file.read())
# Render the template file with the correct values.
rendered = template.render(
enable_tls=self.charm.is_tls_enabled and len(self.charm.peer_members_endpoints) > 0,
peer_endpoints=self.charm.peer_members_endpoints,
path=s3_parameters["path"],
region=s3_parameters.get("region"),
endpoint=s3_parameters["endpoint"],
Expand All @@ -468,6 +537,7 @@ def _render_pgbackrest_conf_file(self, s3_parameters: Dict) -> None:
access_key=s3_parameters["access-key"],
secret_key=s3_parameters["secret-key"],
stanza=self.charm.cluster_name,
storage_path=self.charm._storage_path,
user=BACKUP_USER,
)
# Delete the original file and render the one with the right info.
Expand All @@ -479,6 +549,8 @@ def _render_pgbackrest_conf_file(self, s3_parameters: Dict) -> None:
group=WORKLOAD_OS_GROUP,
)

return True

def _restart_database(self) -> None:
"""Removes the restoring backup flag and restart the database."""
self.charm.app_peer_data.update({"archive-mode": "", "restoring-backup": ""})
Expand Down Expand Up @@ -510,6 +582,34 @@ def _retrieve_s3_parameters(self) -> Tuple[Dict, List[str]]:

return s3_parameters, []

def start_stop_pgbackrest_service(self) -> bool:
"""Start or stop the pgBackRest TLS server service.
Returns:
a boolean indicating whether the operation succeeded.
"""
# Ignore this operation if backups settings aren't ok.
are_backup_settings_ok, _ = self._are_backup_settings_ok()
if not are_backup_settings_ok:
return True

# Update pgBackRest configuration (to update the TLS settings).
if not self._render_pgbackrest_conf_file():
return False

# Stop the service if TLS is not enabled or there are no replicas.
if not self.charm.is_tls_enabled or len(self.charm.peer_members_endpoints) == 0:
self.container.stop(self.charm.pgbackrest_server_service)
return True

# Don't start the service if the service hasn't started yet in the primary.
if not self.charm.is_primary and not self._is_primary_pgbackrest_service_running:
return False

# Start the service.
self.container.restart(self.charm.pgbackrest_server_service)
return True

def _upload_content_to_s3(
self: str,
content: str,
Expand Down
Loading

0 comments on commit ea781c1

Please sign in to comment.