-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrelax_flow.py
100 lines (87 loc) · 2.96 KB
/
relax_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import time
from dflow import Step, Workflow, download_artifact, upload_artifact
from dflow.python import PythonOPTemplate
from dflow.plugins.dispatcher import DispatcherExecutor
lbg_resource_dict = {
"number_node": 1,
"cpu_per_node": 8,
"gpu_per_node": 1,
"queue_name": "eosflow_run",
"group_size": 1,
"source_list": ["/opt/deepmd-kit-2.0.1"]
# "source_list": ["/opt/intel/oneapi/setvars.sh"]
}
lbg_machine_dict = {
"batch_type": "Lebesgue",
"context_type": "LebesgueContext",
"local_root": "./",
"remote_profile": {
"email": "******",
"password": "******",
"program_id": ****,
"input_data": {
"api_version": 2,
"job_type": "indicate",
"log_file": "log",
"grouped": True,
"job_name": "eosflow_run",
"disk_size": 100,
"scass_type": "c12_m92_1 * NVIDIA V100",
"platform": "ali",
# "image_name":"zhuoyli/dflow_test:eos",
"image_name": "LBG_DeePMD-kit_2.0.1_v1.1",
"on_demand": 0
}
}
}
def main():
from relaxation import (RelaxMake, RelaxPost)
from run_relaxation import RelaxRun
# define dispatcher
dispatcher_executor = DispatcherExecutor(
host="127.0.0.1", port="2746",
machine_dict=lbg_machine_dict,
resources_dict=lbg_resource_dict)
wf = Workflow(name="relax")
artifact0 = upload_artifact("param_relax.json")
artifact1 = upload_artifact("POSCAR")
artifact2 = upload_artifact("frozen_model.pb")
print(artifact0)
print(artifact1)
print(artifact2)
# print(artifact3)
step_make = Step(
name="relax-make",
template=PythonOPTemplate(RelaxMake, image="zhuoyli/dflow_test:cn"),
artifacts={"parameters": artifact0,
"structure": artifact1,
"potential": artifact2},
)
artifact_target_tasks = step_make.outputs.artifacts["tasks"]
step_run = Step(
name="relax-run",
template=PythonOPTemplate(RelaxRun,
image="zhuoyli/dflow_test:cn",
command=['python3']),
artifacts={"target_tasks": artifact_target_tasks}, executor=dispatcher_executor,
util_command=['python3']
)
artifact_out_tasks = step_run.outputs.artifacts["out_tasks"]
step_post = Step(
name="relax-post",
template=PythonOPTemplate(RelaxPost, image="zhuoyli/dflow_test:cn"),
artifacts={"parameters": artifact0,
"result_tasks": artifact_out_tasks}
)
wf.add(step_make)
wf.add(step_run)
wf.add(step_post)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert (wf.query_status() == "Succeeded")
step = wf.query_step(name="relax-post")[0]
assert (step.phase == "Succeeded")
download_artifact(step.outputs.artifacts["relaxation_finished"])
if __name__ == "__main__":
main()