-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathtest-launart.py
110 lines (81 loc) · 2.73 KB
/
test-launart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import asyncio
from launart import Launart, Service
art = Launart()
class TestSrv(Service):
id = "test_srv"
@property
def required(self) -> set[str]:
return set()
@property
def stages(self) -> set[str]:
return {"preparing"}
async def launch(self, manager: Launart):
async with self.stage("preparing"):
print("TestSrv: prepared TestInterface")
class TestService(Service):
id = "test"
@property
def required(self):
return {"test_srv"}
@property
def stages(self) -> set[str]:
return {"preparing", "blocking", "cleanup"}
async def launch(self, manager: Launart):
async with self.stage("preparing"):
print("prepare")
await asyncio.sleep(3)
async with self.stage("blocking"):
print("blocking")
await asyncio.sleep(3)
print("unblocking 1")
async with self.stage("cleanup"):
print("cleanup")
await asyncio.sleep(3)
class Test2(Service):
id = "test2"
@property
def required(self) -> set[str]:
return {"test"}
@property
def stages(self) -> set[str]:
return {"preparing", "blocking", "cleanup"}
async def launch(self, manager: Launart):
async with self.stage("preparing"):
print("prepare2")
async with self.stage("blocking"):
print("blocking")
print("test for sideload")
manager.add_component(TestSideload())
await asyncio.sleep(3)
print("unblocking 2")
# await asyncio.sleep(1)
await manager.components["test_sideload"].status.wait_for("blocking")
print("sideload in blocking, test for active cleanup")
manager.remove_component("test_sideload")
await asyncio.sleep(10)
async with self.stage("cleanup"):
print("cleanup2")
class TestSideload(Service):
id = "test_sideload"
@property
def required(self) -> set[str]:
return set()
@property
def stages(self) -> set[str]:
return {"preparing", "blocking", "cleanup"}
async def launch(self, manager: Launart):
async with self.stage("preparing"):
print("prepare in sideload")
await asyncio.sleep(3)
async with self.stage("blocking"):
print("blocking in sideload")
await asyncio.sleep(3)
print("unblocking in sideload")
# print(manager.taskgroup.blocking_task)
async with self.stage("cleanup"):
print("cleanup in sideload")
await asyncio.sleep(3)
art.add_component(TestSrv())
art.add_component(TestService())
art.add_component(Test2())
art.launch_blocking()