diff --git a/otter/convergence/selfheal.py b/otter/convergence/selfheal.py index a6bd11e71..4d7170a51 100644 --- a/otter/convergence/selfheal.py +++ b/otter/convergence/selfheal.py @@ -2,11 +2,10 @@ Self heal service. """ -from effect import Effect +from effect import ComposedDispatcher, Effect, TypeDispatcher from effect.do import do, do_return from kazoo.exceptions import LockTimeout -from kazoo.protocol.states import KazooState from toolz.curried import filter @@ -14,14 +13,14 @@ from twisted.application.service import MultiService from twisted.internet.defer import inlineCallbacks, returnValue -from txeffect import perform +from txeffect import deferred_performer, perform from otter.convergence.composition import tenant_is_enabled from otter.convergence.service import trigger_convergence from otter.log.intents import with_log from otter.models.intents import GetAllValidGroups, GetScalingGroupInfo from otter.models.interface import ScalingGroupStatus -from otter.util.zk import GetChildren +from otter.util.zk import AcquireLock, GetChildren class SelfHeal(MultiService, object): @@ -57,8 +56,8 @@ def __init__(self, dispatcher, kz_client, interval, log, clock, self.calls = [] timer = TimerService( interval, - lambda: self._converge_all(config_func, interval).addErrback( - self.log.err, "self-heal-convergeall-err")) + lambda: self._setup_if_locked(config_func, interval).addErrback( + self.log.err, "self-heal-setup-err")) timer.clock = clock timer.setServiceParent(self) @@ -89,14 +88,15 @@ def health_check(self): """ Return about whether this object has lock """ - d = is_lock_acquired(self.disp, self.lock) + d = perform(self.disp, is_lock_acquired(self.lock)) return d.addCallback(lambda b: (True, {"has_lock": b})) @inlineCallbacks - def _perform(self, config_func, time_range): + def _setup_convergences(self, config_func, time_range): """ Get groups to converge and setup scheduled calls to trigger convergence - on each of them within time_range + on each of them within time_range. For parameters, see + :func:`__init__` docs. """ groups = yield perform(self.disp, get_groups_to_converge(config_func)) @@ -115,30 +115,55 @@ def _perform(self, config_func, time_range): check_and_trigger(group["tenantId"], group["groupId"])) ) - @inlineCallbacks - def _converge_all(self, config_func, time_range): + def _setup_if_locked(self, config_func, time_range): """ Setup convergence triggering on all groups by acquiring the lock and - calling _perform + calling ``_setup_convergences``. For parameters, see + :func:`__init__` docs. """ - if self.kz_client.state != KazooState.CONNECTED: - self.log.err(RuntimeError("self-heal-kz-state"), - "self-heal-kz-state", state=self.kz_client.state) - returnValue(None) - # The reason why it checks everytime by talking to ZK is because - # we could've lost the lock if there where connection changes between - # subsequent intervals and it is not clear how state change to - # SUSPENDED should be handled - if (yield is_lock_acquired(self.disp, self.lock)): - yield self._perform(config_func, time_range) - else: - try: - yield self.lock.acquire(True, 0.1) - self.log.msg("self-heal-lock-acquired") - yield self._perform(config_func, time_range) - except LockTimeout: - # expected. Nothing to do here. Will try on next interval - pass + class SetupConvergences(object): + pass + + @deferred_performer + def sc_performer(d, i): + return self._setup_convergences(config_func, time_range) + + dispatcher = ComposedDispatcher([ + TypeDispatcher({SetupConvergences: sc_performer}), self.disp]) + + d = perform( + dispatcher, + call_if_acquired(self.lock, Effect(SetupConvergences()))) + return d.addCallback( + lambda b: self.log.msg("self-heal-lock-acquired") if b else None) + + +@do +def call_if_acquired(lock, eff): + """ + Call ``eff`` if ``lock`` is acquired. If not, try to acquire the lock + and call ``eff``. This function is different from + :func:`otter.util.deferredutils.with_lock` where this does not release + the lock after calling ``func``. Also it expects that lock may already be + acquired. + + :param lock: Lock object from :obj:`TxKazooClient` + :param eff: ``Effect`` to call if/when lock is acquired + + :return: True if lock was acquired by this function, False otherwise + :rtype: ``Effect`` of ``bool`` + """ + if (yield is_lock_acquired(lock)): + yield eff + else: + try: + yield Effect(AcquireLock(lock, True, 0.1)) + yield eff + yield do_return(True) + except LockTimeout: + # expected. Nothing to do here. + pass + yield do_return(False) def get_groups_to_converge(config_func): @@ -166,20 +191,10 @@ def check_and_trigger(tenant_id, group_id): tenant_id=tenant_id, scaling_group_id=group_id) -def is_lock_acquired(dispatcher, lock): - """ - Does given lock object has acquired the lock? It does this by getting - all the children and checking if first ephemeral node is ours - - :return: `Deferred` of `bool` - """ - return perform(dispatcher, is_lock_acquired_eff(lock)) - - @do -def is_lock_acquired_eff(lock): +def is_lock_acquired(lock): """ - Does given lock object has acquired the lock? + Is the given lock object currently acquired by this worker? :return: `Effect` of `bool` """ diff --git a/otter/log/spec.py b/otter/log/spec.py index 1a0631e65..095dbe503 100644 --- a/otter/log/spec.py +++ b/otter/log/spec.py @@ -213,7 +213,7 @@ def length_calc(e): "sch-exec-pol-err": "Error executing scheduled policy {policy_id}", # Selfheal service - "self-heal-convergeall-err": ( + "self-heal-setup-err": ( "SelfHeal service errored occurred when scheduling convergence"), "self-heal-calls-err": "SelfHeal service has {active} scheduled calls", "self-heal-kz-state": ( diff --git a/otter/test/convergence/test_selfheal.py b/otter/test/convergence/test_selfheal.py index e6fa97e91..1b5cc03ad 100644 --- a/otter/test/convergence/test_selfheal.py +++ b/otter/test/convergence/test_selfheal.py @@ -2,11 +2,11 @@ Tests for :mod:`otter.convergence.selfheal` """ +from effect import Effect from effect.testing import SequenceDispatcher from kazoo.client import KazooClient from kazoo.exceptions import LockTimeout -from kazoo.protocol.states import KazooState import mock @@ -18,10 +18,11 @@ from otter.log.intents import BoundFields from otter.models.intents import GetAllValidGroups, GetScalingGroupInfo from otter.models.interface import GroupState, ScalingGroupStatus +from otter.test.util.test_zk import ZKCrudModel, ZKLock from otter.test.utils import ( CheckFailure, const, intent_func, mock_log, nested_sequence, noop, patch, perform_sequence, raise_) -from otter.util.zk import GetChildren +from otter.util.zk import AcquireLock, GetChildren class SelfHealTests(SynchronousTestCase): @@ -30,101 +31,40 @@ class SelfHealTests(SynchronousTestCase): """ def setUp(self): - self.lock = mock.Mock(specs=["acquire", "release"]) - self.lock.release.return_value = succeed("released") - self.kzc = mock.Mock(specs=["Lock"]) - self.kzc.state = KazooState.CONNECTED - self.kzc.Lock.return_value = self.lock + self.kzc = ZKCrudModel() self.clock = Clock() self.log = mock_log() self.ggtc = patch( self, "otter.convergence.selfheal.get_groups_to_converge", side_effect=intent_func("ggtc")) - self.ila = patch(self, "otter.convergence.selfheal.is_lock_acquired", - return_value=succeed(True)) self.s = sh.SelfHeal("disp", self.kzc, 300, self.log, self.clock, "cf") - def test_converge_all_lock_not_acquired(self): + def test_setup_again(self): """ - When lock is not acquired, it is tried and if failed does not - call _perform - """ - self.ila.return_value = succeed(False) - self.lock.acquire.return_value = fail(LockTimeout()) - # tests that it is not called - self.s._perform = lambda: 1 / 0 - self.s.startService() - self.ila.assert_called_once_with(self.s.disp, self.lock) - self.lock.acquire.assert_called_once_with(True, 0.1) - - def test_converge_all_lock_acquired(self): - """ - When lock is not acquired, it is tried and if successful self._perform - is called - """ - self.ila.return_value = succeed(False) - self.lock.acquire.return_value = succeed(True) - self.s._perform = mock.Mock() - self.s.startService() - self.ila.assert_called_once_with(self.s.disp, self.lock) - self.lock.acquire.assert_called_once_with(True, 0.1) - self.s._perform.assert_called_once_with("cf", 300) - self.log.msg.assert_called_once_with( - "self-heal-lock-acquired", otter_service="selfheal") - - def test_converge_all_lock_already_acquired(self): - """ - If lock is already acquired, it will just call self._perform - """ - self.s._perform = mock.Mock() - self.s.startService() - self.ila.assert_called_once_with(self.s.disp, self.lock) - # Lock is not acquired again - self.assertFalse(self.lock.acquire.called) - self.s._perform.assert_called_once_with("cf", 300) - - def test_converge_all_kz_not_connected(self): - """ - If kazoo client is not connected then nothing is done + Calls _setup_if_locked at every interval """ - self.kzc.state = KazooState.LOST - - def bad_func(*a): - return 1 / 0 - - self.s._perform = self.ila.side_effect = \ - self.lock.acquire.side_effect = bad_func + self.s._setup_if_locked = mock.Mock() self.s.startService() - self.log.err.assert_called_once_with( - mock.ANY, "self-heal-kz-state", state=KazooState.LOST, - otter_service="selfheal") - - def test_performs_again(self): - """ - Calls _perform at every interval - """ - self.s._perform = mock.Mock() - self.s.startService() - self.s._perform.assert_called_once_with("cf", 300) + self.s._setup_if_locked.assert_called_once_with("cf", 300) self.clock.advance(300) - self.assertEqual(self.s._perform.call_count, 2) + self.assertEqual(self.s._setup_if_locked .call_count, 2) - def test_performs_again_on_err(self): + def test_setup_again_on_err(self): """ - Calls _perform at every interval even it it fails + Calls _setup_if_locked at every interval even it it fails """ - self.s._perform = mock.Mock(return_value=fail(ValueError("h"))) + self.s._setup_if_locked = mock.Mock(return_value=fail(ValueError("h"))) self.s.startService() - self.s._perform.assert_called_once_with("cf", 300) + self.s._setup_if_locked.assert_called_once_with("cf", 300) self.log.err.assert_called_once_with( - CheckFailure(ValueError), "self-heal-convergeall-err", + CheckFailure(ValueError), "self-heal-setup-err", otter_service="selfheal") - self.s._perform.return_value = succeed(None) + self.s._setup_if_locked.return_value = succeed(None) self.clock.advance(300) - self.assertEqual(self.s._perform.call_count, 2) + self.assertEqual(self.s._setup_if_locked.call_count, 2) @mock.patch("otter.convergence.selfheal.check_and_trigger") - def test_perform(self, mock_cat): + def test_setup_convergences(self, mock_cat): """ Gets groups and sets up convergence to be triggered at future time """ @@ -132,6 +72,9 @@ def test_perform(self, mock_cat): for i in range(5)] self.s.disp = SequenceDispatcher([(("ggtc", "cf"), const(groups))]) mock_cat.side_effect = lambda t, g: t + g + # Let _setup_if_locked directly call _setup_convergences to avoid + # going through _setup_if_locked + self.s._setup_if_locked = self.s._setup_convergences self.s.startService() calls = self.clock.getDelayedCalls() # Last call will be for next _convere_all call @@ -141,17 +84,20 @@ def test_perform(self, mock_cat): self.assertEqual(c.func, sh.perform) self.assertEqual(c.args, (self.s.disp, "t{}g{}".format(i, i))) - def test_perform_no_groups(self): + def test_setup_convergences_no_groups(self): """ - Gets groups and doesnt do anything if there are no groups + Gets groups and does nothing if there are no groups """ self.s.disp = SequenceDispatcher([(("ggtc", "cf"), const([]))]) + # Let _setup_if_locked directly call _setup_convergences to avoid + # going through _setup_if_locked + self.s._setup_if_locked = self.s._setup_convergences self.s.startService() self.assertEqual(self.s.calls, []) calls = self.clock.getDelayedCalls() self.assertEqual(len(calls), 1) - def test_perform_still_active(self): + def test_setup_convergences_still_active(self): """ If there are scheduled calls when perform is called, they are cancelled and err is logged. Future calls are scheduled as usual @@ -159,33 +105,40 @@ def test_perform_still_active(self): call1 = self.clock.callLater(1, noop, 2) call2 = self.clock.callLater(2, noop, 3) self.s.calls = [call1, call2] - self.test_perform() + self.test_setup_convergences() self.log.err.assert_called_once_with( mock.ANY, "self-heal-calls-err", active=2, otter_service="selfheal") self.assertFalse(call1.active()) self.assertFalse(call2.active()) - def test_perform_errs(self): + def test_setup_convergences_errs(self): """ If getting groups fails, perform just logs the error """ self.s.disp = SequenceDispatcher([ (("ggtc", "cf"), lambda i: raise_(ValueError("huh")))]) + # Let _setup_if_locked directly call _setup_convergences to avoid + # going through _setup_if_locked + self.s._setup_if_locked = self.s._setup_convergences self.s.startService() self.assertEqual(self.s.calls, []) self.log.err.assert_called_once_with( - CheckFailure(ValueError), "self-heal-convergeall-err", + CheckFailure(ValueError), "self-heal-setup-err", otter_service="selfheal") def test_health_check(self): """ Health check returns about lock being acquired """ + self.patch(sh, "is_lock_acquired", intent_func("ila")) + self.s.disp = SequenceDispatcher([ + (("ila", self.s.lock), const(True))]) self.assertEqual( self.successResultOf(self.s.health_check()), (True, {"has_lock": True})) - self.ila.return_value = succeed(False) + self.s.disp = SequenceDispatcher([ + (("ila", self.s.lock), const(False))]) self.assertEqual( self.successResultOf(self.s.health_check()), (True, {"has_lock": False})) @@ -195,19 +148,79 @@ def test_stop_service(self): `stopService` will stop the timer, cancel any scheduled calls and release lock """ - self.test_perform() + self.s.lock.acquired = True + self.test_setup_convergences() calls = self.s.calls[:] d = self.s.stopService() # calls cancelled self.assertTrue(all(not c.active() for c in calls)) # lock released - self.lock.release.assert_called_with() - self.assertEqual(self.successResultOf(d), "released") + self.assertIsNone(self.successResultOf(d)) + self.assertFalse(self.s.lock.acquired) # timer stopped; having bad dispatcher would raise error if perform # was called again self.s.disp = "bad" self.clock.advance(300) + def test_setup_if_locked(self): + """ + :func:`_setup_if_locked` calls ``self._setup_convergences`` through + ``call_if_acquired`` + """ + + def cia(l, e): + self.assertIs(l, self.s.lock) + return e + + self.patch(sh, "call_if_acquired", cia) + self.s.disp = SequenceDispatcher([]) + self.s._setup_convergences = mock.Mock(return_value=succeed(True)) + d = self.s._setup_if_locked("cf", 35) + self.assertIsNone(self.successResultOf(d)) + self.s._setup_convergences.assert_called_once_with("cf", 35) + self.log.msg.assert_called_once_with( + "self-heal-lock-acquired", otter_service="selfheal") + + +class CallIfAcquiredTests(SynchronousTestCase): + """ + Tests for :func:`call_if_acquired` + """ + def setUp(self): + self.patch(sh, "is_lock_acquired", intent_func("ila")) + + def test_lock_not_acquired(self): + """ + When lock is not acquired, it is tried and if failed does not + call eff + """ + lock = object() + seq = [(("ila", lock), const(False)), + (AcquireLock(lock, True, 0.1), lambda i: raise_(LockTimeout()))] + self.assertFalse( + perform_sequence(seq, sh.call_if_acquired(lock, Effect("call")))) + + def test_lock_acquired(self): + """ + When lock is not acquired, it is tried and if successful calls eff + """ + lock = object() + seq = [(("ila", lock), const(False)), + (AcquireLock(lock, True, 0.1), const(True)), + ("call", noop)] + self.assertTrue( + perform_sequence(seq, sh.call_if_acquired(lock, Effect("call")))) + + def test_lock_already_acquired(self): + """ + If lock is already acquired, it will just call eff + """ + lock = object() + seq = [(("ila", lock), const(True)), + ("call", noop)] + self.assertFalse( + perform_sequence(seq, sh.call_if_acquired(lock, Effect("call")))) + class GetGroupsToConvergeTests(SynchronousTestCase): """ @@ -283,46 +296,37 @@ class IsLockAcquiredTests(SynchronousTestCase): Tests for :func:`is_lock_acquired` and :func:`is_lock_acquired_eff` """ - def test_eff_no_children(self): + def test_no_children(self): """ If lock node does not have any children, it does not have lock """ - lock = mock.Mock(spec=KazooClient, path="/lock") + lock = ZKLock("client", "/lock") seq = [(GetChildren("/lock"), const([]))] - self.assertFalse(perform_sequence(seq, sh.is_lock_acquired_eff(lock))) + self.assertFalse(perform_sequence(seq, sh.is_lock_acquired(lock))) - def test_eff_has_lock(self): + def test_has_lock(self): """ Lock node's first child belongs to given object. Hence has the lock """ prefix = "someprefix__lock__" - lock = mock.Mock(spec=KazooClient, path="/lock", prefix=prefix) + lock = ZKLock("client", "/lock") + lock.prefix = prefix children = ["errrprefix__lock__0000000004", "{}0000000001".format(prefix), "whyprefix__lock__0000000002"] seq = [(GetChildren("/lock"), const(children))] - self.assertTrue(perform_sequence(seq, sh.is_lock_acquired_eff(lock))) + self.assertTrue(perform_sequence(seq, sh.is_lock_acquired(lock))) - def test_eff_no_lock(self): + def test_no_lock(self): """ If lock's node is not the first in the sorted list of children, then it does not have the lock """ prefix = "whyprefix__lock__" - lock = mock.Mock(spec=KazooClient, path="/lock", prefix=prefix) + lock = ZKLock("client", "/lock") + lock.prefix = prefix children = ["errrprefix__lock__0000000004", "someprefix__lock__0000000001", "{}0000000002".format(prefix)] seq = [(GetChildren("/lock"), const(children))] - self.assertFalse(perform_sequence(seq, sh.is_lock_acquired_eff(lock))) - - def test_is_lock_acquired_performs(self): - """ - `is_lock_acquired` just performs the effect returned from - `is_lock_acquired_eff` with given dispatcher - """ - self.patch(sh, "is_lock_acquired_eff", intent_func("ilae")) - disp = SequenceDispatcher([(("ilae", "lock"), const("ret"))]) - self.assertEqual( - self.successResultOf(sh.is_lock_acquired(disp, "lock")), - "ret") + self.assertFalse(perform_sequence(seq, sh.is_lock_acquired(lock))) diff --git a/otter/test/util/test_zk.py b/otter/test/util/test_zk.py index 0d2f814df..a1eed67b9 100644 --- a/otter/test/util/test_zk.py +++ b/otter/test/util/test_zk.py @@ -13,7 +13,7 @@ from otter.test.utils import test_dispatcher from otter.util.zk import ( - CreateOrSet, CreateOrSetLoopLimitReachedError, + AcquireLock, CreateOrSet, CreateOrSetLoopLimitReachedError, DeleteNode, GetChildren, GetChildrenWithStats, GetStat, get_zk_dispatcher, @@ -88,6 +88,29 @@ def exists(self, path): else: return None + def Lock(self, path): + return ZKLock(self, path) + + +class ZKLock(object): + """ + Stub for :obj:`kazoo.recipe.lock.KazooLock` + """ + def __init__(self, client, path): + self.client = client + self.path = path + self.acquired = False + self.acquire_calls = {} + + def acquire(self, blocking=True, timeout=None): + assert not self.acquired + self.acquired = self.acquire_calls[(blocking, timeout)] + return succeed(self.acquired) + + def release(self): + self.acquired = False + return succeed(None) + class CreateOrSetTests(SynchronousTestCase): """Tests for :func:`create_or_set`.""" @@ -245,3 +268,14 @@ def test_delete(self): result = sync_perform(dispatcher, eff) self.assertEqual(model.nodes, {}) self.assertEqual(result, 'delete return value') + + +class AcquireLockTests(SynchronousTestCase): + """Tests for :obj:`AcquireLock`.""" + def test_success(self): + lock = ZKLock("client", "path") + lock.acquire_calls[(True, 0.3)] = True + eff = Effect(AcquireLock(lock, True, 0.3)) + dispatcher = get_zk_dispatcher("client") + result = sync_perform(dispatcher, eff) + self.assertIs(result, True) diff --git a/otter/util/zk.py b/otter/util/zk.py index 79e9abdb2..844b431b4 100644 --- a/otter/util/zk.py +++ b/otter/util/zk.py @@ -1,5 +1,7 @@ from functools import partial +import attr + from characteristic import attributes from effect import Effect, TypeDispatcher, parallel, sync_performer @@ -135,6 +137,24 @@ def perform_delete_node(kz_client, dispatcher, intent): return kz_client.delete(intent.path, version=intent.version) +@attr.s +class AcquireLock(object): + """ + Intent to acquire lock + """ + lock = attr.ib() + blocking = attr.ib(default=True) + timeout = attr.ib(default=None) + + +@deferred_performer +def perform_acquire_lock(dispatcher, intent): + """ + Perform :obj:`AcquireLock`. + """ + return intent.lock.acquire(intent.blocking, intent.timeout) + + def get_zk_dispatcher(kz_client): """Get a dispatcher that can support all of the ZooKeeper intents.""" return TypeDispatcher({ @@ -147,5 +167,6 @@ def get_zk_dispatcher(kz_client): GetChildren: partial(perform_get_children, kz_client), GetStat: - partial(perform_get_stat, kz_client) + partial(perform_get_stat, kz_client), + AcquireLock: perform_acquire_lock })