Skip to content

Commit

Permalink
lock acquired refactoring - thanks @glyph
Browse files Browse the repository at this point in the history
moved calling _perform based on lock to a separate function with
effect. Using ZKCrudModel instead of mock (again thanks to @glyph) in
effect closing #1903. And renamed some functions.
  • Loading branch information
manishtomar committed Jun 29, 2016
1 parent a52b0b7 commit 6e3acf4
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 152 deletions.
99 changes: 57 additions & 42 deletions otter/convergence/selfheal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@
Self heal service.
"""

from effect import Effect
from effect import ComposedDispatcher, Effect, TypeDispatcher
from effect.do import do, do_return

from kazoo.exceptions import LockTimeout
from kazoo.protocol.states import KazooState

from toolz.curried import filter

from twisted.application.internet import TimerService
from twisted.application.service import MultiService
from twisted.internet.defer import inlineCallbacks, returnValue

from txeffect import perform
from txeffect import deferred_performer, perform

from otter.convergence.composition import tenant_is_enabled
from otter.convergence.service import trigger_convergence
from otter.log.intents import with_log
from otter.models.intents import GetAllValidGroups, GetScalingGroupInfo
from otter.models.interface import ScalingGroupStatus
from otter.util.zk import GetChildren
from otter.util.zk import AcquireLock, GetChildren


class SelfHeal(MultiService, object):
Expand Down Expand Up @@ -57,8 +56,8 @@ def __init__(self, dispatcher, kz_client, interval, log, clock,
self.calls = []
timer = TimerService(
interval,
lambda: self._converge_all(config_func, interval).addErrback(
self.log.err, "self-heal-convergeall-err"))
lambda: self._setup_if_locked(config_func, interval).addErrback(
self.log.err, "self-heal-setup-err"))
timer.clock = clock
timer.setServiceParent(self)

Expand Down Expand Up @@ -89,14 +88,15 @@ def health_check(self):
"""
Return about whether this object has lock
"""
d = is_lock_acquired(self.disp, self.lock)
d = perform(self.disp, is_lock_acquired(self.lock))
return d.addCallback(lambda b: (True, {"has_lock": b}))

@inlineCallbacks
def _perform(self, config_func, time_range):
def _setup_convergences(self, config_func, time_range):
"""
Get groups to converge and setup scheduled calls to trigger convergence
on each of them within time_range
on each of them within time_range. For parameters, see
:func:`__init__` docs.
"""
groups = yield perform(self.disp,
get_groups_to_converge(config_func))
Expand All @@ -115,30 +115,55 @@ def _perform(self, config_func, time_range):
check_and_trigger(group["tenantId"], group["groupId"]))
)

@inlineCallbacks
def _converge_all(self, config_func, time_range):
def _setup_if_locked(self, config_func, time_range):
"""
Setup convergence triggering on all groups by acquiring the lock and
calling _perform
calling ``_setup_convergences``. For parameters, see
:func:`__init__` docs.
"""
if self.kz_client.state != KazooState.CONNECTED:
self.log.err(RuntimeError("self-heal-kz-state"),
"self-heal-kz-state", state=self.kz_client.state)
returnValue(None)
# The reason why it checks everytime by talking to ZK is because
# we could've lost the lock if there where connection changes between
# subsequent intervals and it is not clear how state change to
# SUSPENDED should be handled
if (yield is_lock_acquired(self.disp, self.lock)):
yield self._perform(config_func, time_range)
else:
try:
yield self.lock.acquire(True, 0.1)
self.log.msg("self-heal-lock-acquired")
yield self._perform(config_func, time_range)
except LockTimeout:
# expected. Nothing to do here. Will try on next interval
pass
class SetupConvergences(object):
pass

@deferred_performer
def sc_performer(d, i):
return self._setup_convergences(config_func, time_range)

dispatcher = ComposedDispatcher([
TypeDispatcher({SetupConvergences: sc_performer}), self.disp])

d = perform(
dispatcher,
call_if_acquired(self.lock, Effect(SetupConvergences())))
return d.addCallback(
lambda b: self.log.msg("self-heal-lock-acquired") if b else None)


@do
def call_if_acquired(lock, eff):
"""
Call ``eff`` if ``lock`` is acquired. If not, try to acquire the lock
and call ``eff``. This function is different from
:func:`otter.util.deferredutils.with_lock` where this does not release
the lock after calling ``func``. Also it expects that lock may already be
acquired.
:param lock: Lock object from :obj:`TxKazooClient`
:param eff: ``Effect`` to call if/when lock is acquired
:return: True if lock was acquired by this function, False otherwise
:rtype: ``Effect`` of ``bool``
"""
if (yield is_lock_acquired(lock)):
yield eff
else:
try:
yield Effect(AcquireLock(lock, True, 0.1))
yield eff
yield do_return(True)
except LockTimeout:
# expected. Nothing to do here.
pass
yield do_return(False)


def get_groups_to_converge(config_func):
Expand Down Expand Up @@ -166,20 +191,10 @@ def check_and_trigger(tenant_id, group_id):
tenant_id=tenant_id, scaling_group_id=group_id)


def is_lock_acquired(dispatcher, lock):
"""
Does given lock object has acquired the lock? It does this by getting
all the children and checking if first ephemeral node is ours
:return: `Deferred` of `bool`
"""
return perform(dispatcher, is_lock_acquired_eff(lock))


@do
def is_lock_acquired_eff(lock):
def is_lock_acquired(lock):
"""
Does given lock object has acquired the lock?
Is the given lock object currently acquired by this worker?
:return: `Effect` of `bool`
"""
Expand Down
2 changes: 1 addition & 1 deletion otter/log/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def length_calc(e):
"sch-exec-pol-err": "Error executing scheduled policy {policy_id}",

# Selfheal service
"self-heal-convergeall-err": (
"self-heal-setup-err": (
"SelfHeal service errored occurred when scheduling convergence"),
"self-heal-calls-err": "SelfHeal service has {active} scheduled calls",
"self-heal-kz-state": (
Expand Down
Loading

0 comments on commit 6e3acf4

Please sign in to comment.