diff --git a/docs/dynamic_configuration.rst b/docs/dynamic_configuration.rst index f38a8b396..bfcfc3847 100644 --- a/docs/dynamic_configuration.rst +++ b/docs/dynamic_configuration.rst @@ -108,3 +108,7 @@ Note: if cluster topology is static (fixed number of nodes that never change the .. warning:: Permanent replication slots are synchronized only from the ``primary``/``standby_leader`` to replica nodes. That means, applications are supposed to be using them only from the leader node. Using them on replica nodes will cause indefinite growth of ``pg_wal`` on all other nodes in the cluster. An exception to that rule are permanent physical slots that match the Patroni member names, if you happen to configure any. Those will be synchronized among all nodes as they are used for replication among them. + + +.. warning:: + Setting ``nostream`` tag on standby disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas if any. diff --git a/docs/replication_modes.rst b/docs/replication_modes.rst index 0ba5f4b16..3c6fb38e1 100644 --- a/docs/replication_modes.rst +++ b/docs/replication_modes.rst @@ -53,7 +53,7 @@ are available. As a downside, the primary is not be available for writes blocking all client write requests until at least one synchronous replica comes up. -You can ensure that a standby never becomes the synchronous standby by setting ``nosync`` tag to true. This is recommended to set for standbys that are behind slow network connections and would cause performance degradation when becoming a synchronous standby. +You can ensure that a standby never becomes the synchronous standby by setting ``nosync`` tag to true. This is recommended to set for standbys that are behind slow network connections and would cause performance degradation when becoming a synchronous standby. Setting tag ``nostream`` to true will also have the same effect. Synchronous mode can be switched on and off via Patroni REST interface. See :ref:`dynamic configuration ` for instructions. diff --git a/docs/yaml_configuration.rst b/docs/yaml_configuration.rst index 92f7445b7..331034d0b 100644 --- a/docs/yaml_configuration.rst +++ b/docs/yaml_configuration.rst @@ -399,6 +399,7 @@ Tags - **nosync**: ``true`` or ``false``. If set to ``true`` the node will never be selected as a synchronous replica. - **nofailover**: ``true`` or ``false``, controls whether this node is allowed to participate in the leader race and become a leader. Defaults to ``false``, meaning this node _can_ participate in leader races. - **failover_priority**: integer, controls the priority that this node should have during failover. Nodes with higher priority will be preferred over lower priority nodes if they received/replayed the same amount of WAL. However, nodes with higher values of receive/replay LSN are preferred regardless of their priority. If the ``failover_priority`` is 0 or negative - such node is not allowed to participate in the leader race and to become a leader (similar to ``nofailover: true``). +- **nostream**: ``true`` or ``false``. If set to ``true`` the node will not use replication protocol to stream WAL. It will rely instead on archive recovery (if ``restore_command`` is configured) and ``pg_wal``/``pg_xlog`` polling. It also disables copying and synchronization of permanent logical replication slots on the node itself and all its cascading replicas. Setting this tag on primary node has no effect. .. warning:: Provide only one of ``nofailover`` or ``failover_priority``. Providing ``nofailover: true`` is the same as ``failover_priority: 0``, and providing ``nofailover: false`` will give the node priority 1. diff --git a/patroni/config_generator.py b/patroni/config_generator.py index dcb717c23..cf0b02a52 100644 --- a/patroni/config_generator.py +++ b/patroni/config_generator.py @@ -126,6 +126,7 @@ def get_template_config(cls) -> Dict[str, Any]: 'noloadbalance': False, 'clonefrom': True, 'nosync': False, + 'nostream': False, } } diff --git a/patroni/dcs/__init__.py b/patroni/dcs/__init__.py index 481a8e6c3..21eba1bd6 100644 --- a/patroni/dcs/__init__.py +++ b/patroni/dcs/__init__.py @@ -1039,6 +1039,8 @@ def _get_permanent_slots(self, postgresql: 'Postgresql', tags: Tags, role: str) .. note:: Permanent replication slots are only considered if ``use_slots`` configuration is enabled. A node that is not supposed to become a leader (*nofailover*) will not have permanent replication slots. + Also node with disabled streaming (*nostream*) and its cascading followers must not have permanent + logical slots due to lack of feedback from node to primary, which makes them unsafe to use. In a standby cluster we only support physical replication slots. @@ -1054,7 +1056,7 @@ def _get_permanent_slots(self, postgresql: 'Postgresql', tags: Tags, role: str) if not global_config.use_slots or tags.nofailover: return {} - if global_config.is_standby_cluster: + if global_config.is_standby_cluster or self.get_slot_name_on_primary(postgresql.name, tags) is None: return self.__permanent_physical_slots \ if postgresql.major_version >= SLOT_ADVANCE_AVAILABLE_VERSION or role == 'standby_leader' else {} @@ -1069,6 +1071,10 @@ def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]: the ``replicatefrom`` destination member is currently not a member of the cluster (fallback to the primary), or if ``replicatefrom`` destination member happens to be the current primary. + If the ``nostream`` tag is set on the member - we should not create the replication slot for it on + the current primary or any other member even if ``replicatefrom`` is set, because ``nostream`` disables + WAL streaming. + Will log an error if: * Conflicting slot names between members are found @@ -1083,8 +1089,9 @@ def _get_members_slots(self, name: str, role: str) -> Dict[str, Dict[str, str]]: if not global_config.use_slots: return {} - # we always want to exclude the member with our name from the list - members = filter(lambda m: m.name != name, self.members) + # we always want to exclude the member with our name from the list, + # also exlude members with disabled WAL streaming + members = filter(lambda m: m.name != name and not m.nostream, self.members) if role in ('master', 'primary', 'standby_leader'): members = [m for m in members if m.replicatefrom is None @@ -1172,7 +1179,7 @@ def should_enforce_hot_standby_feedback(self, postgresql: 'Postgresql', member: return any(self.should_enforce_hot_standby_feedback(postgresql, m) for m in members) return False - def get_slot_name_on_primary(self, name: str, tags: Tags) -> str: + def get_slot_name_on_primary(self, name: str, tags: Tags) -> Optional[str]: """Get the name of physical replication slot for this node on the primary. .. note:: @@ -1186,6 +1193,8 @@ def get_slot_name_on_primary(self, name: str, tags: Tags) -> str: :returns: the slot name on the primary that is in use for physical replication on this node. """ + if tags.nostream: + return None replicatefrom = self.get_member(tags.replicatefrom, False) if tags.replicatefrom else None return self.get_slot_name_on_primary(replicatefrom.name, replicatefrom) \ if isinstance(replicatefrom, Member) else slot_name_from_member_name(name) diff --git a/patroni/ha.py b/patroni/ha.py index e763905ce..16c050f37 100644 --- a/patroni/ha.py +++ b/patroni/ha.py @@ -607,9 +607,12 @@ def _get_node_to_follow(self, cluster: Cluster) -> Union[Leader, Member, None]: :returns: the node which we should be replicating from. """ + # nostream is set, the node must not use WAL streaming + if self.patroni.nostream: + return None # The standby leader or when there is no standby leader we want to follow # the remote member, except when there is no standby leader in pause. - if self.is_standby_cluster() \ + elif self.is_standby_cluster() \ and (cluster.leader and cluster.leader.name and cluster.leader.name == self.state_handler.name or cluster.is_unlocked() and not self.is_paused()): node_to_follow = self.get_remote_member() diff --git a/patroni/tags.py b/patroni/tags.py index eedc96742..e1e68f267 100644 --- a/patroni/tags.py +++ b/patroni/tags.py @@ -3,13 +3,16 @@ from typing import Any, Dict, Optional -from patroni.utils import parse_int +from patroni.utils import parse_int, parse_bool class Tags(abc.ABC): """An abstract class that encapsulates all the ``tags`` logic. Child classes that want to use provided facilities must implement ``tags`` abstract property. + + .. note:: + Due to backward-compatibility reasons, old tags may have a less strict type conversion than new ones. """ @staticmethod @@ -20,7 +23,7 @@ def _filter_tags(tags: Dict[str, Any]) -> Dict[str, Any]: .. note:: A custom tag is any tag added to the configuration ``tags`` section that is not one of ``clonefrom``, - ``nofailover``, ``noloadbalance`` or ``nosync``. + ``nofailover``, ``noloadbalance``,``nosync`` or ``nostream``. For most of the Patroni predefined tags, the returning object will only contain them if they are enabled as they all are boolean values that default to disabled. @@ -31,7 +34,7 @@ def _filter_tags(tags: Dict[str, Any]) -> Dict[str, Any]: tag value. """ return {tag: value for tag, value in tags.items() - if any((tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync'), + if any((tag not in ('clonefrom', 'nofailover', 'noloadbalance', 'nosync', 'nostream'), value, tag == 'nofailover' and 'failover_priority' in tags))} @@ -89,3 +92,8 @@ def nosync(self) -> bool: def replicatefrom(self) -> Optional[str]: """Value of ``replicatefrom`` tag, if any.""" return self.tags.get('replicatefrom') + + @property + def nostream(self) -> bool: + """``True`` if ``nostream`` is ``True``, else ``False``.""" + return parse_bool(self.tags.get('nostream')) or False diff --git a/patroni/validator.py b/patroni/validator.py index a2818a110..204c81f0d 100644 --- a/patroni/validator.py +++ b/patroni/validator.py @@ -1172,6 +1172,7 @@ def validate_watchdog_mode(value: Any) -> None: Optional("clonefrom"): bool, Optional("noloadbalance"): bool, Optional("replicatefrom"): str, - Optional("nosync"): bool + Optional("nosync"): bool, + Optional("nostream"): bool } }) diff --git a/postgres0.yml b/postgres0.yml index 84796a469..1232ab4a9 100644 --- a/postgres0.yml +++ b/postgres0.yml @@ -136,3 +136,4 @@ tags: noloadbalance: false clonefrom: false nosync: false + nostream: false diff --git a/tests/test_config_generator.py b/tests/test_config_generator.py index c6913da31..e9b82686f 100644 --- a/tests/test_config_generator.py +++ b/tests/test_config_generator.py @@ -142,6 +142,7 @@ def _set_running_instance_config_vals(self): 'noloadbalance': False, 'clonefrom': True, 'nosync': False, + 'nostream': False } } patch_config(self.config, conf) diff --git a/tests/test_ha.py b/tests/test_ha.py index ef3156858..43844c5d9 100644 --- a/tests/test_ha.py +++ b/tests/test_ha.py @@ -151,6 +151,7 @@ def __init__(self, p, d): self.api.connection_string = 'http://127.0.0.1:8008' self.clonefrom = None self.nosync = False + self.nostream = False self.scheduled_restart = {'schedule': future_restart_time, 'postmaster_start_time': str(postmaster_start_time)} self.watchdog = Watchdog(self.config) @@ -474,6 +475,11 @@ def test_demote_because_update_lock_failed(self): self.p.is_primary = false self.assertEqual(self.ha.run_cycle(), 'not promoting because failed to update leader lock in DCS') + def test_get_node_to_follow_nostream(self): + self.ha.patroni.nostream = True + self.ha.cluster = get_cluster_initialized_with_leader() + self.assertEqual(self.ha._get_node_to_follow(self.ha.cluster), None) + @patch.object(Cluster, 'is_unlocked', Mock(return_value=False)) def test_follow(self): self.p.is_primary = false diff --git a/tests/test_patroni.py b/tests/test_patroni.py index 2f8428b1c..da65ebaaf 100644 --- a/tests/test_patroni.py +++ b/tests/test_patroni.py @@ -249,6 +249,16 @@ def test_nosync(self): self.p.tags['nosync'] = None self.assertFalse(self.p.nosync) + def test_nostream(self): + self.p.tags['nostream'] = 'True' + self.assertTrue(self.p.nostream) + self.p.tags['nostream'] = 'None' + self.assertFalse(self.p.nostream) + self.p.tags['nostream'] = 'foo' + self.assertFalse(self.p.nostream) + self.p.tags['nostream'] = '' + self.assertFalse(self.p.nostream) + @patch.object(Thread, 'join', Mock()) def test_shutdown(self): self.p.api.shutdown = Mock(side_effect=Exception) diff --git a/tests/test_slots.py b/tests/test_slots.py index 3be2bbd77..a7112f81a 100644 --- a/tests/test_slots.py +++ b/tests/test_slots.py @@ -124,6 +124,68 @@ def test_process_permanent_slots(self): "confirmed_flush_lsn": 12345, "catalog_xmin": 105}])] self.assertEqual(self.p.slots(), {}) + def test_nostream_slot_processing(self): + config = ClusterConfig( + 1, {'slots': {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}}, 1) + nostream_node = Member(0, 'test-2', 28, { + 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'tags': {'nostream': 'True'} + }) + cascade_node = Member(0, 'test-3', 28, { + 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres', + 'tags': {'replicatefrom': 'test-2'} + }) + stream_node = Member(0, 'test-4', 28, { + 'state': 'running', 'conn_url': 'postgres://replicator:rep-pass@127.0.0.1:5436/postgres'}) + cluster = Cluster( + True, config, self.leader, Status.empty(), + [self.leadermem, nostream_node, cascade_node, stream_node], None, SyncState.empty(), None, None) + global_config.update(cluster) + + # sanity for primary + self.p.name = self.leadermem.name + self.assertEqual( + cluster._get_permanent_slots(self.p, self.leadermem, 'primary'), + {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}) + self.assertEqual( + cluster._get_members_slots(self.p.name, 'primary'), + {'test_4': {'type': 'physical'}}) + + # nostream node must not have slot on primary + self.p.name = nostream_node.name + # permanent logical slots are not allowed on nostream node + self.assertEqual( + cluster._get_permanent_slots(self.p, nostream_node, 'replica'), + {'bar': {'type': 'physical'}}) + self.assertEqual( + cluster.get_slot_name_on_primary(self.p.name, nostream_node), + None) + + # check cascade member-slot existence on nostream node + self.assertEqual( + cluster._get_members_slots(nostream_node.name, 'replica'), + {'test_3': {'type': 'physical'}}) + + # cascade also does not entitled to have logical slot on itself ... + self.p.name = cascade_node.name + self.assertEqual( + cluster._get_permanent_slots(self.p, cascade_node, 'replica'), + {'bar': {'type': 'physical'}}) + # ... and member-slot on primary + self.assertEqual( + cluster.get_slot_name_on_primary(self.p.name, cascade_node), + None) + + # simple replica must have every permanent slot ... + self.p.name = stream_node.name + self.assertEqual( + cluster._get_permanent_slots(self.p, stream_node, 'replica'), + {'foo': {'type': 'logical', 'database': 'a', 'plugin': 'b'}, 'bar': {'type': 'physical'}}) + # ... and member-slot on primary + self.assertEqual( + cluster.get_slot_name_on_primary(self.p.name, stream_node), + 'test_4') + @patch.object(Postgresql, 'is_primary', Mock(return_value=False)) def test__ensure_logical_slots_replica(self): self.p.set_role('replica') diff --git a/tests/test_validator.py b/tests/test_validator.py index 1c000c1c5..55f1ef7f0 100644 --- a/tests/test_validator.py +++ b/tests/test_validator.py @@ -103,7 +103,8 @@ "nofailover": False, "clonefrom": False, "noloadbalance": False, - "nosync": False + "nosync": False, + "nostream": False } }