-
Notifications
You must be signed in to change notification settings - Fork 32
/
utils_slurm.py
executable file
·111 lines (91 loc) · 4.09 KB
/
utils_slurm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import copy
import os
from typing import List, Dict
import pandas as pd
from COMPS import AuthManager
from COMPS.Data import QueryCriteria, Simulation
from idmtools.core import ItemType
from idmtools.entities.iplatform import IPlatform
def _get_serialized_filenames(num_cores, timesteps):
if num_cores == 1:
serialized_list = [f"state-{str(timesteps).zfill(5)}.dtk"]
else:
serialized_list = [f"state-{str(timesteps).zfill(5)}-{str(core_count).zfill(3)}.dtk"
for core_count in range(num_cores)]
return serialized_list
def _get_core_counts(sim_id, platform):
# TODO, get num_cores from simulation in slurm
# sim = platform.get_item(sim_id, ItemType.SIMULATION, raw=True)
# sim.refresh(QueryCriteria().select_children('hpc_jobs'))
# num_cores = int(sim.hpc_jobs[-1].configuration.max_cores)
num_cores = 1
return num_cores
def get_workdir_from_simulations(platform: 'IPlatform', comps_simulations: List[Simulation]) -> Dict[str, str]:
"""
Get COMPS filepath
Args:
platform: idmtools Platform
comps_simulations: COMPS Simulations
Returns: dictionary with simid as key and filepath as value
"""
if platform.environment.upper() == "SLURMSTAGE" or platform.environment.upper() == "CALCULON":
mounts = AuthManager.get_environment_macros(platform.environment)['DOCKER_MOUNTS']
mounts = {v[0]: v[1:4] for v in [m.split(';') for m in mounts.split('|')]}
# pretend I'm on Linux and set the Linux mapping environment variables
for k, v in mounts.items():
os.environ[k] = ';'.join([v[0], v[2]])
sim_work_dir = {str(sim.id): sim.hpc_jobs[-1].working_directory for sim in comps_simulations if sim.hpc_jobs}
return sim_work_dir
def create_sim_directory_map(exp_id: str, platform: 'IPlatform'):
"""
Return a dataframe which contains simulation's working_path and tags.
Args:
exp_id: experiment id
platform: idmtools platform
Returns:
dataframe
"""
# Get idmtools Experiment
exp = platform.get_item(exp_id, ItemType.EXPERIMENT, raw=False)
tags_list = []
for sim in exp.simulations:
tags = copy.deepcopy(sim.tags)
tags.update(dict(simid=sim.id))
tags_list.append(tags)
df = pd.DataFrame(tags_list)
if len(df) != len(exp.simulations):
print(f'Warning: not all jobs in {exp_id} succeeded', ':', )
print(len(exp.simulations) - len(df), 'unsucceeded')
simulations = exp.simulations
dir_list = []
for sim in simulations:
dir_dict = {"simid": str(sim.id), "serialized_file_path": platform._op_client.get_directory(sim)}
dir_list.append(dir_dict)
df_dir = pd.DataFrame(dir_list)
df = pd.merge(left=df, right=df_dir, on='simid')
return df
def build_burnin_df(exp_id: str, platform,serialize_days):
"""
return dataframe which contains serialized_file_path, serialized_population_filenames
Args:
exp_id:
platform:
Returns:
dataframe:
Run_Number | Serialization_Time_Steps | task_type | sweep_tag | simid | serialized_file_path|Num_Cores|Serialized_Population_Filenames
Note, Serialized_Population_Filenames depends on n_cores. if n_cores = 2, Serialized_Population_Filenames look
like these: state-00050-000.dtk, state-00050-001.dtk
"""
df = create_sim_directory_map(exp_id, platform)
# add Num_Cores to df
df["Num_Cores"] = df["simid"].apply(_get_core_counts, platform=platform)
#print(list(df.columns))
#print(df.head())
#try:
burnin_length_in_days = serialize_days#int(df["Serialization_Time_Steps"].iloc[0].strip('[]'))
#except AttributeError:
# different versions of pandas save this as either a string or a list
#burnin_length_in_days = df["Serialization_Time_Steps"].iloc[0][-1]
df["Serialized_Population_Filenames"] = df["Num_Cores"].apply(_get_serialized_filenames, timesteps=burnin_length_in_days)
df = df.reset_index(drop=True)
return df