def__init__(self,**kwargs:Any)->None:
-self.state:dict[str,Any]={}
-self.config:TaskManagerConfig=TaskManagerConfig(**kwargs)
-self.dispatcher:Annotated[
-TaskDispatcher,
-Doc(
-"""
- A dispatcher of task run events.
-
- Register handlers to listen for task run events.
- """
-),
-]=TaskDispatcher()
-self.broker=TaskBroker.from_url(self.config.broker_url)
-self._stack=AsyncExitStack()
+69
def__init__(self,**kwargs:Any)->None:
+self.state:dict[str,Any]={}
+self.config:TaskManagerConfig=TaskManagerConfig(**kwargs)
+self.dispatcher:Annotated[
+TaskDispatcher,
+Doc(
+"""
+ A dispatcher of task run events.
+
+ Register handlers to listen for task run events.
+ """
+),
+]=TaskDispatcher()
+self.broker=TaskBroker.from_url(self.config.broker_url)
+self._stack=AsyncExitStack()
asyncdefexecute(self,task:Task|str,**params:Any)->TaskRun:
-"""Execute a task and wait for it to finish"""
-task_run=self.create_task_run(task,**params)
-awaittask_run.execute()
-returntask_run
+95
asyncdefexecute(self,task:Task|str,**params:Any)->TaskRun:
+"""Execute a task and wait for it to finish"""
+task_run=self.create_task_run(task,**params)
+awaittask_run.execute()
+returntask_run
defregister_task(self,task:Task)->None:
-"""Register a task with the task manager
-
- Only tasks registered can be executed by a task manager
- """
-self.broker.register_task(task)
+108
defregister_task(self,task:Task)->None:
+"""Register a task with the task manager
+
+ Only tasks registered can be executed by a task manager
+ """
+self.broker.register_task(task)
asyncdefqueue(
-self,
-task:str|Task,
-priority:TaskPriority|None=None,
-**params:Any,
-)->TaskRun:
-"""Queue a task for execution
-
- This methods fires two events:
-
- - queue: when the task is about to be queued
- - queued: after the task is queued
- """
-task_run=self.create_task_run(task,priority=priority,**params)
-self.dispatcher.dispatch(task_run)
-task_run.set_state(TaskState.queued)
-awaitself.broker.queue_task(task_run)
-returntask_run
+127
asyncdefqueue(
+self,
+task:str|Task,
+priority:TaskPriority|None=None,
+**params:Any,
+)->TaskRun:
+"""Queue a task for execution
+
+ This methods fires two events:
+
+ - queue: when the task is about to be queued
+ - queued: after the task is queued
+ """
+task_run=self.create_task_run(task,priority=priority,**params)
+self.dispatcher.dispatch(task_run)
+task_run.set_state(TaskState.queued)
+awaitself.broker.queue_task(task_run)
+returntask_run
defregister_async_handler(self,event:str,handler:AsyncHandler)->None:
-"""Register an async handler for a given event
-
- This method is a no op for a TaskManager that is not a worker
- """
+159
defregister_async_handler(self,event:str,handler:AsyncHandler)->None:
+"""Register an async handler for a given event
+
+ This method is a no op for a TaskManager that is not a worker
+ """
defunregister_async_handler(self,event:Event|str)->AsyncHandler|None:
-"""Unregister an async handler for a given event
-
- This method is a no op for a TaskManager that is not a worker
- """
-returnNone
+166
defunregister_async_handler(self,event:Event|str)->AsyncHandler|None:
+"""Unregister an async handler for a given event
+
+ This method is a no op for a TaskManager that is not a worker
+ """
+returnNone
defcli(self,**kwargs:Any)->Any:
-"""Create the task manager command line interface"""
-try:
-fromfluid.scheduler.cliimportTaskManagerCLI
-exceptImportError:
-raiseImportError(
-"TaskManagerCLI is not available - "
-"install with `pip install aio-fluid[cli]`"
-)fromNone
-returnTaskManagerCLI(self,**kwargs)
+177
defcli(self,**kwargs:Any)->Any:
+"""Create the task manager command line interface"""
+try:
+fromfluid.scheduler.cliimportTaskManagerCLI
+exceptImportError:
+raiseImportError(
+"TaskManagerCLI is not available - "
+"install with `pip install aio-fluid[cli]`"
+)fromNone
+returnTaskManagerCLI(self,**kwargs)
defadd_workers(self,*workers:Worker)->None:
-"""add workers to the workers"""
-workers_,_=self._workers.workers_tasks()
-forworkerinworkers:
-ifworkernotinworkers_:
-workers_.append(worker)
+421
defadd_workers(self,*workers:Worker)->None:
+"""add workers to the workers"""
+workers_,_=self._workers.workers_tasks()
+forworkerinworkers:
+ifworkernotinworkers_:
+workers_.append(worker)
@asynccontextmanager
-asyncdefsafe_run(self)->AsyncGenerator:
-"""Context manager to run a worker safely"""
-try:
-yield
-exceptasyncio.CancelledError:
-ifself._force_shutdown:
-# we are shutting down, this is expected
-pass
-raise
-exceptExceptionase:
-reason=f"unhandled exception while running workers: {e}"
-logger.exception(reason)
-asyncio.get_event_loop().call_soon(self.bail_out,reason,2)
-else:
-# worker finished without error
-# make sure we are shutting down
-asyncio.get_event_loop().call_soon(self.bail_out,"worker exit",1)
+363
@asynccontextmanager
+asyncdefsafe_run(self)->AsyncGenerator:
+"""Context manager to run a worker safely"""
+try:
+yield
+exceptasyncio.CancelledError:
+ifself._force_shutdown:
+# we are shutting down, this is expected
+pass
+raise
+exceptExceptionase:
+reason=f"unhandled exception while running workers: {e}"
+logger.exception(reason)
+asyncio.get_event_loop().call_soon(self.bail_out,reason,2)
+else:
+# worker finished without error
+# make sure we are shutting down
+asyncio.get_event_loop().call_soon(self.bail_out,"worker exit",1)
asyncdefexecute(self,task:Task|str,**params:Any)->TaskRun:
-"""Execute a task and wait for it to finish"""
-task_run=self.create_task_run(task,**params)
-awaittask_run.execute()
-returntask_run
+95
asyncdefexecute(self,task:Task|str,**params:Any)->TaskRun:
+"""Execute a task and wait for it to finish"""
+task_run=self.create_task_run(task,**params)
+awaittask_run.execute()
+returntask_run
defregister_task(self,task:Task)->None:
-"""Register a task with the task manager
-
- Only tasks registered can be executed by a task manager
- """
-self.broker.register_task(task)
+108
defregister_task(self,task:Task)->None:
+"""Register a task with the task manager
+
+ Only tasks registered can be executed by a task manager
+ """
+self.broker.register_task(task)
asyncdefqueue(
-self,
-task:str|Task,
-priority:TaskPriority|None=None,
-**params:Any,
-)->TaskRun:
-"""Queue a task for execution
-
- This methods fires two events:
-
- - queue: when the task is about to be queued
- - queued: after the task is queued
- """
-task_run=self.create_task_run(task,priority=priority,**params)
-self.dispatcher.dispatch(task_run)
-task_run.set_state(TaskState.queued)
-awaitself.broker.queue_task(task_run)
-returntask_run
+127
asyncdefqueue(
+self,
+task:str|Task,
+priority:TaskPriority|None=None,
+**params:Any,
+)->TaskRun:
+"""Queue a task for execution
+
+ This methods fires two events:
+
+ - queue: when the task is about to be queued
+ - queued: after the task is queued
+ """
+task_run=self.create_task_run(task,priority=priority,**params)
+self.dispatcher.dispatch(task_run)
+task_run.set_state(TaskState.queued)
+awaitself.broker.queue_task(task_run)
+returntask_run