8
8
import time
9
9
from contextlib import contextmanager
10
10
import copy
11
+ import random
11
12
12
13
import attr
13
14
import grpc
26
27
from .scheduler import TagSet , schedule
27
28
from .generated import labgrid_coordinator_pb2
28
29
from .generated import labgrid_coordinator_pb2_grpc
29
- from ..util import atomic_replace , labgrid_version , yaml
30
+ from ..util import atomic_replace , labgrid_version , yaml , Timeout
30
31
31
32
32
33
@contextmanager
@@ -220,7 +221,7 @@ def __init__(self) -> None:
220
221
self .load ()
221
222
222
223
self .loop = asyncio .get_running_loop ()
223
- for name in ["save" , "reacquire " , "schedule" ]:
224
+ for name in ["save" , "sync_resources " , "schedule" ]:
224
225
step_func = getattr (self , f"_poll_step_{ name } " )
225
226
task = self .loop .create_task (self .poll (step_func ), name = f"coordinator-poll-{ name } " )
226
227
self .poll_tasks .append (task )
@@ -231,11 +232,11 @@ async def _poll_step_save(self):
231
232
with warn_if_slow ("save changes" , level = logging .DEBUG ):
232
233
await self .save ()
233
234
234
- async def _poll_step_reacquire (self ):
235
- # try to re-acquire orphaned resources
235
+ async def _poll_step_sync_resources (self ):
236
+ # try to synchronize resources
236
237
async with self .lock :
237
- with warn_if_slow ("reacquire orphaned resources" , limit = 3.0 ):
238
- await self ._reacquire_orphaned_resources ()
238
+ with warn_if_slow ("synchronize resources" , limit = 3.0 ):
239
+ await self ._synchronize_resources ()
239
240
240
241
async def _poll_step_schedule (self ):
241
242
# update reservations
@@ -638,6 +639,14 @@ async def _acquire_resources(self, place, resources):
638
639
if resource .acquired :
639
640
return False
640
641
642
+ for otherplace in self .places .values ():
643
+ for oldres in otherplace .acquired_resources :
644
+ if resource .path == oldres .path :
645
+ logging .info (
646
+ "Conflicting orphaned resource %s for acquire request for place %s" , oldres , place .name
647
+ )
648
+ return False
649
+
641
650
# acquire resources
642
651
acquired = []
643
652
try :
@@ -692,47 +701,124 @@ async def _release_resources(self, place, resources, callback=True):
692
701
except :
693
702
logging .exception ("failed to publish released resource %s" , resource )
694
703
695
- async def _reacquire_orphaned_resources (self ):
704
+ async def _synchronize_resources (self ):
696
705
assert self .lock .locked ()
697
706
698
- for place in self .places .values ():
699
- changed = False
707
+ # fix:
708
+ # - a resource is acquired for a place that is not acquired
709
+ # * perhaps caused by a resource acquire timeout (during normal lock)
710
+ # -> release()
711
+ # - a resource is acquired for a place that still has it as orphaned
712
+ # * perhaps caused by a resource acquire timeout (during reacquire)
713
+ # -> replace orphaned resource
714
+ # - a resource is released, but a place still has it as orphaned
715
+ # * perhaps caused by a exporter restart
716
+ # -> acquire() and replace orphaned resource
717
+
718
+ acquired_resources = {}
719
+ used_resources = {}
720
+ orphaned_resources = {}
721
+
722
+ # find acquired resources
723
+ for exporter in self .exporters .values ():
724
+ for group in exporter .groups .values ():
725
+ for resource in group .values ():
726
+ if resource .acquired :
727
+ acquired_resources [resource .path ] = resource
700
728
701
- for idx , resource in enumerate (place .acquired_resources ):
729
+ # find resources used by places
730
+ for place in self .places .values ():
731
+ for resource in place .acquired_resources :
702
732
if not resource .orphaned :
703
- continue
733
+ used_resources [resource .path ] = resource
734
+ else :
735
+ orphaned_resources [resource .path ] = resource
736
+
737
+ timeout = Timeout (5.0 )
738
+
739
+ # find resources to be released
740
+ to_release = list (acquired_resources .keys () - used_resources .keys () - orphaned_resources .keys ())
741
+ if to_release :
742
+ logging .info ("synchronize resources: %s acquired resource(s) should be released" , len (to_release ))
743
+ random .shuffle (to_release ) # don't get stuck on a problematic resource
744
+ for resource_path in to_release :
745
+ if timeout .expired :
746
+ continue # release the coordinator lock
747
+
748
+ resource = acquired_resources [resource_path ]
749
+ if resource .acquired == "<broken>" :
750
+ continue
751
+ place = self .places .get (resource .acquired )
752
+ print (f"should release { resource } for { place } ?" )
704
753
705
- # is the exporter connected again?
706
- exporter = self .get_exporter_by_name (resource .path [0 ])
707
- if not exporter :
708
- continue
754
+ if place is None :
755
+ logging .warning ("resource %s claims to be acquired by unknown place" , resource )
756
+ elif not place .acquired :
757
+ logging .warning ("resource %s claims to be acquired by unacquired place" , resource )
758
+ else :
759
+ continue
760
+ try :
761
+ await self ._release_resources (place , [resource ])
762
+ del acquired_resources [resource_path ]
763
+ except Exception :
764
+ logging .exception ("failed to release unused resource %s" , resource )
765
+ break
709
766
710
- # does the resource exist again?
711
- try :
712
- new_resource = exporter .groups [resource .path [1 ]][resource .path [3 ]]
713
- except KeyError :
714
- continue
767
+ # find orphaned resources to be acquired
768
+ to_acquire = list (orphaned_resources .keys () - acquired_resources .keys ())
769
+ if to_acquire :
770
+ logging .info ("synchronize resources: %s orphaned resource(s) should be acquired" , len (to_acquire ))
771
+ random .shuffle (to_acquire ) # don't get stuck on a problematic resource
772
+ for resource_path in to_acquire :
773
+ if timeout .expired :
774
+ continue # release the coordinator lock
775
+
776
+ resource = orphaned_resources [resource_path ]
777
+ if resource .acquired == "<broken>" :
778
+ continue
779
+ place = self .places .get (resource .acquired )
780
+ assert place is not None
781
+ assert place .acquired
782
+ print (f"should acquire { resource } for { place } ?" )
783
+
784
+ # is the exporter connected again?
785
+ exporter = self .get_exporter_by_name (resource .path [0 ])
786
+ if not exporter :
787
+ continue
715
788
716
- if new_resource .acquired :
717
- # this should only happen when resources become broken
718
- logging .debug ("ignoring acquired/broken resource %s for place %s" , new_resource , place .name )
719
- continue
789
+ # does the resource exist again?
790
+ try :
791
+ new_resource = exporter .groups [resource .path [1 ]][resource .path [3 ]]
792
+ except KeyError :
793
+ continue
720
794
721
- try :
722
- await self ._acquire_resource (place , new_resource )
723
- place .acquired_resources [idx ] = new_resource
724
- except Exception :
725
- logging .exception (
726
- "failed to reacquire orphaned resource %s for place %s" , new_resource , place .name
727
- )
728
- break
729
-
730
- logging .info ("reacquired orphaned resource %s for place %s" , new_resource , place .name )
731
- changed = True
732
-
733
- if changed :
734
- self ._publish_place (place )
735
- self .save_later ()
795
+ if new_resource .acquired :
796
+ # this should only happen when resources become broken
797
+ logging .warning ("ignoring acquired/broken resource %s for place %s" , new_resource , place .name )
798
+ continue
799
+
800
+ try :
801
+ await self ._acquire_resource (place , new_resource )
802
+ acquired_resources [new_resource .path ] = new_resource
803
+ except Exception :
804
+ logging .exception ("failed to reacquire orphaned resource %s for place %s" , new_resource , place .name )
805
+ break
806
+
807
+ # find orphaned resources to be replaced in the places
808
+ to_replace = set (orphaned_resources .keys () & acquired_resources .keys ())
809
+ if to_replace :
810
+ logging .info ("synchronize resources: %s orphaned resource(s) should be replaced" , len (to_replace ))
811
+ for resource_path in set (orphaned_resources .keys () & acquired_resources .keys ()):
812
+ oldresource = orphaned_resources [resource_path ]
813
+ newresource = acquired_resources [resource_path ]
814
+ assert oldresource .acquired == newresource .acquired
815
+
816
+ place = self .places .get (newresource .acquired )
817
+ assert place is not None
818
+ assert place .acquired
819
+
820
+ idx = place .acquired_resources .index (oldresource )
821
+ place .acquired_resources [idx ] = newresource
736
822
737
823
@locked
738
824
async def AcquirePlace (self , request , context ):
@@ -755,9 +841,6 @@ async def AcquirePlace(self, request, context):
755
841
if not res .owner == username :
756
842
await context .abort (grpc .StatusCode .PERMISSION_DENIED , f"Place { name } was not reserved for { username } " )
757
843
758
- # First try to reacquire orphaned resources to avoid conflicts.
759
- await self ._reacquire_orphaned_resources ()
760
-
761
844
# FIXME use the session object instead? or something else which
762
845
# survives disconnecting clients?
763
846
place .acquired = username
0 commit comments