-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcomputations.py
136 lines (118 loc) · 5.19 KB
/
computations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from enum import Enum
from logging import Logger
from pprint import pprint
from typing import List, Tuple, Dict, Optional
import distributed
from dask.delayed import Delayed
from distributed import Future, Client
from artificial_bias_experiments.experiment_utils import create_logger, close_logger
class TimeUnitEnum(Enum):
SECONDS = 1
MINUTES = 2
HOURS = 3
class TimeoutUnit:
def get_timeout_limit_in_seconds(self, duration: float, duration_unit: TimeUnitEnum) -> float:
if duration_unit == TimeUnitEnum.SECONDS:
return int(duration)
elif duration_unit == TimeUnitEnum.MINUTES:
return int(duration * 60)
elif duration_unit == TimeUnitEnum.HOURS:
return int(duration * 3600)
# def compute_delayed_functions_old(
# list_of_computations: List[Tuple[Delayed, Dict]],
# client: Client,
# nb_of_retries_if_erred: int,
# error_logger_name: str,
# error_logger_file_name: str
# ) -> None:
# print("start compute")
# print(list_of_computations)
#
# list_of_delayed_function_calls: List[Delayed] = [computation[0] for computation in list_of_computations]
#
# list_of_futures: List[Future] = client.compute(list_of_delayed_function_calls, retries=nb_of_retries_if_erred)
# distributed.wait(list_of_futures)
# print("end compute")
#
# error_logger: Logger = create_logger(logger_name=error_logger_name, log_file_name=error_logger_file_name)
# future: Future
# for future, (delayed, func_args) in zip(list_of_futures, list_of_computations):
# if future.status == 'error':
# exception = future.exception()
# error_logger.error(f"{exception.__class__}: {exception}\n"
# f"\tfor arguments {func_args}"
# )
# close_logger(error_logger)
# def compute_delayed_functions_old(
# list_of_computations: List[Tuple[Delayed, Dict]],
# client: Client,
# nb_of_retries_if_erred: int,
# error_logger_name: str,
# error_logger_file_name: str,
# timeout_s: Optional[int]=None
# ) -> None:
#
# error_logger: Logger = create_logger(logger_name=error_logger_name, log_file_name=error_logger_file_name)
# try:
# print("start compute")
# print(list_of_computations)
#
# list_of_delayed_function_calls: List[Delayed] = [computation[0] for computation in list_of_computations]
#
# list_of_futures: List[Future] = client.compute(list_of_delayed_function_calls, retries=nb_of_retries_if_erred)
# # distributed.as_completed()
# distributed.wait(list_of_futures)
# print("end compute")
# error_logger: Logger = create_logger(logger_name=error_logger_name, log_file_name=error_logger_file_name)
# future: Future
# for future, (delayed, func_args) in zip(list_of_futures, list_of_computations):
# if future.status == 'error':
# exception = future.exception()
# error_logger.error(f"{exception.__class__}: {exception}\n"
# f"\tfor arguments {func_args}"
# )
# finally:
# close_logger(error_logger)
def compute_delayed_functions(
list_of_computations: List[Tuple[Delayed, Dict]],
client: Client,
nb_of_retries_if_erred: int,
error_logger_name: str,
error_logger_file_name: str,
timeout_s: Optional[int]=None
) -> None:
error_logger: Logger = create_logger(logger_name=error_logger_name, log_file_name=error_logger_file_name)
try:
print("start compute")
print(list_of_computations)
future_id_to_args_map: Dict[int, Dict] = {}
list_of_delayed_function_calls: List[Delayed] = []
computation: Delayed
arguments: Dict
for computation, arguments in list_of_computations:
print("Computation key", computation.key)
pprint(arguments)
print("---")
list_of_delayed_function_calls.append(computation)
future_id_to_args_map[computation.key] = arguments
# [computation[0] for computation in list_of_computations]
list_of_futures: List[Future] = client.compute(list_of_delayed_function_calls, retries=nb_of_retries_if_erred)
# distributed.as_completed()
# distributed.wait(list_of_futures)
completed_future: Future
for completed_future in distributed.as_completed(list_of_futures):
func_args = future_id_to_args_map[completed_future.key]
if completed_future.status == 'error':
exception = completed_future.exception()
error_logger.error(f"{exception.__class__}: {exception}\n"
f"\tfor arguments {func_args}"
)
print(f"Erred computation key {completed_future.key}")
else:
print(f"Finished computation key {completed_future.key} corresponding to")
pprint(func_args)
print("---")
del future_id_to_args_map[completed_future.key]
completed_future.release()
finally:
close_logger(error_logger)