Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor handling of orphaned resources in the coordinator #1580

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 125 additions & 42 deletions labgrid/remote/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
from contextlib import contextmanager
import copy
import random

import attr
import grpc
Expand All @@ -26,7 +27,7 @@
from .scheduler import TagSet, schedule
from .generated import labgrid_coordinator_pb2
from .generated import labgrid_coordinator_pb2_grpc
from ..util import atomic_replace, labgrid_version, yaml
from ..util import atomic_replace, labgrid_version, yaml, Timeout


@contextmanager
Expand Down Expand Up @@ -220,7 +221,7 @@ def __init__(self) -> None:
self.load()

self.loop = asyncio.get_running_loop()
for name in ["save", "reacquire", "schedule"]:
for name in ["save", "sync_resources", "schedule"]:
step_func = getattr(self, f"_poll_step_{name}")
task = self.loop.create_task(self.poll(step_func), name=f"coordinator-poll-{name}")
self.poll_tasks.append(task)
Expand All @@ -231,11 +232,11 @@ async def _poll_step_save(self):
with warn_if_slow("save changes", level=logging.DEBUG):
await self.save()

async def _poll_step_reacquire(self):
# try to re-acquire orphaned resources
async def _poll_step_sync_resources(self):
# try to synchronize resources
async with self.lock:
with warn_if_slow("reacquire orphaned resources", limit=3.0):
await self._reacquire_orphaned_resources()
with warn_if_slow("synchronize resources", limit=3.0):
await self._synchronize_resources()

async def _poll_step_schedule(self):
# update reservations
Expand Down Expand Up @@ -638,6 +639,14 @@ async def _acquire_resources(self, place, resources):
if resource.acquired:
return False

for otherplace in self.places.values():
for oldres in otherplace.acquired_resources:
if resource.path == oldres.path:
logging.info(
"Conflicting orphaned resource %s for acquire request for place %s", oldres, place.name
)
return False

# acquire resources
acquired = []
try:
Expand Down Expand Up @@ -692,47 +701,124 @@ async def _release_resources(self, place, resources, callback=True):
except:
logging.exception("failed to publish released resource %s", resource)

async def _reacquire_orphaned_resources(self):
async def _synchronize_resources(self):
assert self.lock.locked()

for place in self.places.values():
changed = False
# fix:
# - a resource is acquired for a place that is not acquired
# * perhaps caused by a resource acquire timeout (during normal lock)
# -> release()
# - a resource is acquired for a place that still has it as orphaned
# * perhaps caused by a resource acquire timeout (during reacquire)
# -> replace orphaned resource
# - a resource is released, but a place still has it as orphaned
# * perhaps caused by a exporter restart
# -> acquire() and replace orphaned resource

acquired_resources = {}
used_resources = {}
orphaned_resources = {}

# find acquired resources
for exporter in self.exporters.values():
for group in exporter.groups.values():
for resource in group.values():
if resource.acquired:
acquired_resources[resource.path] = resource

for idx, resource in enumerate(place.acquired_resources):
# find resources used by places
for place in self.places.values():
for resource in place.acquired_resources:
if not resource.orphaned:
continue
used_resources[resource.path] = resource
else:
orphaned_resources[resource.path] = resource

timeout = Timeout(5.0)

# find resources to be released
to_release = list(acquired_resources.keys() - used_resources.keys() - orphaned_resources.keys())
if to_release:
logging.info("synchronize resources: %s acquired resource(s) should be released", len(to_release))
random.shuffle(to_release) # don't get stuck on a problematic resource
for resource_path in to_release:
if timeout.expired:
continue # release the coordinator lock

resource = acquired_resources[resource_path]
if resource.acquired == "<broken>":
continue
place = self.places.get(resource.acquired)
print(f"should release {resource} for {place}?")

# is the exporter connected again?
exporter = self.get_exporter_by_name(resource.path[0])
if not exporter:
continue
if place is None:
logging.warning("resource %s claims to be acquired by unknown place", resource)
elif not place.acquired:
logging.warning("resource %s claims to be acquired by unacquired place", resource)
else:
continue
try:
await self._release_resources(place, [resource])
del acquired_resources[resource_path]
except Exception:
logging.exception("failed to release unused resource %s", new_resource)
break

# does the resource exist again?
try:
new_resource = exporter.groups[resource.path[1]][resource.path[3]]
except KeyError:
continue
# find orphaned resources to be acquired
to_acquire = list(orphaned_resources.keys() - acquired_resources.keys())
if to_acquire:
logging.info("synchronize resources: %s orphaned resource(s) should be acquired", len(to_acquire))
random.shuffle(to_acquire) # don't get stuck on a problematic resource
for resource_path in to_acquire:
if timeout.expired:
continue # release the coordinator lock

resource = orphaned_resources[resource_path]
if resource.acquired == "<broken>":
continue
place = self.places.get(resource.acquired)
assert place is not None
assert place.acquired
print(f"should acquire {resource} for {place}?")

# is the exporter connected again?
exporter = self.get_exporter_by_name(resource.path[0])
if not exporter:
continue

if new_resource.acquired:
# this should only happen when resources become broken
logging.debug("ignoring acquired/broken resource %s for place %s", new_resource, place.name)
continue
# does the resource exist again?
try:
new_resource = exporter.groups[resource.path[1]][resource.path[3]]
except KeyError:
continue

try:
await self._acquire_resource(place, new_resource)
place.acquired_resources[idx] = new_resource
except Exception:
logging.exception(
"failed to reacquire orphaned resource %s for place %s", new_resource, place.name
)
break

logging.info("reacquired orphaned resource %s for place %s", new_resource, place.name)
changed = True

if changed:
self._publish_place(place)
self.save_later()
if new_resource.acquired:
# this should only happen when resources become broken
logging.warning("ignoring acquired/broken resource %s for place %s", new_resource, place.name)
continue

try:
await self._acquire_resource(place, new_resource)
acquired_resources[new_resource.path] = new_resource
except Exception:
logging.exception("failed to reacquire orphaned resource %s for place %s", new_resource, place.name)
break

# find orphaned resources to be replaced in the places
to_replace = set(orphaned_resources.keys() & acquired_resources.keys())
if to_replace:
logging.info("synchronize resources: %s orphaned resource(s) should be replaced", len(to_replace))
for resource_path in set(orphaned_resources.keys() & acquired_resources.keys()):
oldresource = orphaned_resources[resource_path]
newresource = acquired_resources[resource_path]
assert oldresource.acquired == newresource.acquired

place = self.places.get(newresource.acquired)
assert place is not None
assert place.acquired

idx = place.acquired_resources.index(oldresource)
place.acquired_resources[idx] = newresource

@locked
async def AcquirePlace(self, request, context):
Expand All @@ -755,9 +841,6 @@ async def AcquirePlace(self, request, context):
if not res.owner == username:
await context.abort(grpc.StatusCode.PERMISSION_DENIED, f"Place {name} was not reserved for {username}")

# First try to reacquire orphaned resources to avoid conflicts.
await self._reacquire_orphaned_resources()

# FIXME use the session object instead? or something else which
# survives disconnecting clients?
place.acquired = username
Expand Down
Loading