From 28cab27314573c2af73c785b3455f416436dba38 Mon Sep 17 00:00:00 2001 From: Rouven Czerwinski Date: Wed, 15 Jan 2025 11:57:05 +0100 Subject: [PATCH 1/2] remote/coordinator: fail acquire for orphaned Check if a place acquire request tries to lock a resource that is marked as orphaned. This means we no longer need to reacquire orphaned resource before trying to acquire a places resources since we now refuse to acquire the resources. This avoids long delays on aquire calls if the exporter responsible for an (unrelated) orphaned resource doesn't process commands quickly. Signed-off-by: Rouven Czerwinski --- labgrid/remote/coordinator.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/labgrid/remote/coordinator.py b/labgrid/remote/coordinator.py index a69b857a2..2ef44b289 100644 --- a/labgrid/remote/coordinator.py +++ b/labgrid/remote/coordinator.py @@ -638,6 +638,14 @@ async def _acquire_resources(self, place, resources): if resource.acquired: return False + for otherplace in self.places.values(): + for oldres in otherplace.acquired_resources: + if resource.path == oldres.path: + logging.info( + "Conflicting orphaned resource %s for acquire request for place %s", oldres, place.name + ) + return False + # acquire resources acquired = [] try: @@ -755,9 +763,6 @@ async def AcquirePlace(self, request, context): if not res.owner == username: await context.abort(grpc.StatusCode.PERMISSION_DENIED, f"Place {name} was not reserved for {username}") - # First try to reacquire orphaned resources to avoid conflicts. - await self._reacquire_orphaned_resources() - # FIXME use the session object instead? or something else which # survives disconnecting clients? place.acquired = username From 109ef0d09c61f4e2621713584cf14ef9367b2357 Mon Sep 17 00:00:00 2001 From: Jan Luebbe Date: Wed, 15 Jan 2025 16:10:52 +0100 Subject: [PATCH 2/2] remote/coordinator: refactor handling of orphaned resources Labgrid currently uses bidirectional streams between coordinator and client/exporter. For the client side this is a good fit, since the client sends requests and the coordinator can answer directly. However for the exporter we have a case where nested calls were used in the old crossbar infrastructure, namely when re-acquiring a resource after the exporter was offline but the place was kept acquired. We call these orphaned resources. They replace the real resource on the coordinator side until the resource can be reacquired on the respective exporter after it has restarted. With crossbar, when seeing the resource update, the coordinator could directly call the exporter to acquire the resource for the specific place. This was possible since crossbar did the RPC route handling and arbitrary services connected to the crossbar could provide RPC calls to the service. With GRPC, we are more constrained. Since we only have a single Input/Output stream which needs to multiplex different objects, nested calls are not directly supported, since the exporter side would still wait for the coordinator to answer its own request. A different approach to orphaned resource handling is required. The coordinator now uses a loop where it checks the orphaned resources and tries to reacquire them if the exporter reappears. This however introduces another problem, the exporter can be under high load and thus the acquire request from the coordinator can time out. In this case, we need to abort the acquisition during a regular lock and in case of an orphaned resource need to replace the orphaned resource with the eventually acquired resource from the exporter. We also need to handle the case where the exporter has an acquired resource, but the place has been released in the meantime (perhaps due to a timeout on a normal place acquire), the same poll loop handles this in the coordinator as well. All in all this means that the resource acquired state for each place is not necessarily consistent on the coordinator, but will reach an eventual consistent state. This should be sufficient, since exporter restarts with orphaned resources should be relatively rare. Signed-off-by: Jan Luebbe --- labgrid/remote/coordinator.py | 156 +++++++++++++++++++++++++--------- 1 file changed, 117 insertions(+), 39 deletions(-) diff --git a/labgrid/remote/coordinator.py b/labgrid/remote/coordinator.py index 2ef44b289..300f12f02 100644 --- a/labgrid/remote/coordinator.py +++ b/labgrid/remote/coordinator.py @@ -8,6 +8,7 @@ import time from contextlib import contextmanager import copy +import random import attr import grpc @@ -26,7 +27,7 @@ from .scheduler import TagSet, schedule from .generated import labgrid_coordinator_pb2 from .generated import labgrid_coordinator_pb2_grpc -from ..util import atomic_replace, labgrid_version, yaml +from ..util import atomic_replace, labgrid_version, yaml, Timeout @contextmanager @@ -220,7 +221,7 @@ def __init__(self) -> None: self.load() self.loop = asyncio.get_running_loop() - for name in ["save", "reacquire", "schedule"]: + for name in ["save", "sync_resources", "schedule"]: step_func = getattr(self, f"_poll_step_{name}") task = self.loop.create_task(self.poll(step_func), name=f"coordinator-poll-{name}") self.poll_tasks.append(task) @@ -231,11 +232,11 @@ async def _poll_step_save(self): with warn_if_slow("save changes", level=logging.DEBUG): await self.save() - async def _poll_step_reacquire(self): - # try to re-acquire orphaned resources + async def _poll_step_sync_resources(self): + # try to synchronize resources async with self.lock: - with warn_if_slow("reacquire orphaned resources", limit=3.0): - await self._reacquire_orphaned_resources() + with warn_if_slow("synchronize resources", limit=3.0): + await self._synchronize_resources() async def _poll_step_schedule(self): # update reservations @@ -700,47 +701,124 @@ async def _release_resources(self, place, resources, callback=True): except: logging.exception("failed to publish released resource %s", resource) - async def _reacquire_orphaned_resources(self): + async def _synchronize_resources(self): assert self.lock.locked() - for place in self.places.values(): - changed = False + # fix: + # - a resource is acquired for a place that is not acquired + # * perhaps caused by a resource acquire timeout (during normal lock) + # -> release() + # - a resource is acquired for a place that still has it as orphaned + # * perhaps caused by a resource acquire timeout (during reacquire) + # -> replace orphaned resource + # - a resource is released, but a place still has it as orphaned + # * perhaps caused by a exporter restart + # -> acquire() and replace orphaned resource + + acquired_resources = {} + used_resources = {} + orphaned_resources = {} + + # find acquired resources + for exporter in self.exporters.values(): + for group in exporter.groups.values(): + for resource in group.values(): + if resource.acquired: + acquired_resources[resource.path] = resource - for idx, resource in enumerate(place.acquired_resources): + # find resources used by places + for place in self.places.values(): + for resource in place.acquired_resources: if not resource.orphaned: - continue + used_resources[resource.path] = resource + else: + orphaned_resources[resource.path] = resource + + timeout = Timeout(5.0) + + # find resources to be released + to_release = list(acquired_resources.keys() - used_resources.keys() - orphaned_resources.keys()) + if to_release: + logging.info("synchronize resources: %s acquired resource(s) should be released", len(to_release)) + random.shuffle(to_release) # don't get stuck on a problematic resource + for resource_path in to_release: + if timeout.expired: + continue # release the coordinator lock + + resource = acquired_resources[resource_path] + if resource.acquired == "": + continue + place = self.places.get(resource.acquired) + print(f"should release {resource} for {place}?") - # is the exporter connected again? - exporter = self.get_exporter_by_name(resource.path[0]) - if not exporter: - continue + if place is None: + logging.warning("resource %s claims to be acquired by unknown place", resource) + elif not place.acquired: + logging.warning("resource %s claims to be acquired by unacquired place", resource) + else: + continue + try: + await self._release_resources(place, [resource]) + del acquired_resources[resource_path] + except Exception: + logging.exception("failed to release unused resource %s", resource) + break - # does the resource exist again? - try: - new_resource = exporter.groups[resource.path[1]][resource.path[3]] - except KeyError: - continue + # find orphaned resources to be acquired + to_acquire = list(orphaned_resources.keys() - acquired_resources.keys()) + if to_acquire: + logging.info("synchronize resources: %s orphaned resource(s) should be acquired", len(to_acquire)) + random.shuffle(to_acquire) # don't get stuck on a problematic resource + for resource_path in to_acquire: + if timeout.expired: + continue # release the coordinator lock + + resource = orphaned_resources[resource_path] + if resource.acquired == "": + continue + place = self.places.get(resource.acquired) + assert place is not None + assert place.acquired + print(f"should acquire {resource} for {place}?") + + # is the exporter connected again? + exporter = self.get_exporter_by_name(resource.path[0]) + if not exporter: + continue - if new_resource.acquired: - # this should only happen when resources become broken - logging.debug("ignoring acquired/broken resource %s for place %s", new_resource, place.name) - continue + # does the resource exist again? + try: + new_resource = exporter.groups[resource.path[1]][resource.path[3]] + except KeyError: + continue - try: - await self._acquire_resource(place, new_resource) - place.acquired_resources[idx] = new_resource - except Exception: - logging.exception( - "failed to reacquire orphaned resource %s for place %s", new_resource, place.name - ) - break - - logging.info("reacquired orphaned resource %s for place %s", new_resource, place.name) - changed = True - - if changed: - self._publish_place(place) - self.save_later() + if new_resource.acquired: + # this should only happen when resources become broken + logging.warning("ignoring acquired/broken resource %s for place %s", new_resource, place.name) + continue + + try: + await self._acquire_resource(place, new_resource) + acquired_resources[new_resource.path] = new_resource + except Exception: + logging.exception("failed to reacquire orphaned resource %s for place %s", new_resource, place.name) + break + + # find orphaned resources to be replaced in the places + to_replace = set(orphaned_resources.keys() & acquired_resources.keys()) + if to_replace: + logging.info("synchronize resources: %s orphaned resource(s) should be replaced", len(to_replace)) + for resource_path in set(orphaned_resources.keys() & acquired_resources.keys()): + oldresource = orphaned_resources[resource_path] + newresource = acquired_resources[resource_path] + assert oldresource.acquired == newresource.acquired + + place = self.places.get(newresource.acquired) + assert place is not None + assert place.acquired + + idx = place.acquired_resources.index(oldresource) + place.acquired_resources[idx] = newresource @locked async def AcquirePlace(self, request, context):