-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
135 lines (103 loc) · 4.24 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import itertools
import numpy as np
import pandas as pd
def convert_arrays_to_lists(d: dict):
for key, value in d.items():
if isinstance(value, np.ndarray):
d[key] = value.tolist()
return d
def connectivity_mat(edges_data: pd.DataFrame, from_col: str = 'from', to_col: str = 'to', direction='', param=''):
n_edges = len(edges_data)
n_nodes = pd.concat([edges_data[from_col], edges_data[to_col]]).nunique()
mat = np.zeros((n_nodes, n_edges))
mat[edges_data.loc[:, from_col], np.arange(n_edges)] = -1
mat[edges_data.loc[:, to_col], np.arange(n_edges)] = 1
if direction == 'in':
mat[mat == -1] = 0
if direction == 'out':
mat[mat == 1] = 0
if param:
# row-wise multiplication
mat = mat * edges_data[param].values
return mat
def get_mat_for_type(data: pd.DataFrame, category_data: pd.DataFrame, inverse=False):
"""
generate a matrix that can be multiplied by nodes / edges vector to get nodes / edges of certain type
returns a NxN matrix that is based on an eye matrix where only nodes / edges from the requested type are 1
N is the number of nodes / edges
inverse - to return all nodes /edges beside the input type
data: pd.DataFrame - probably one of: wds.nodes, wds.pipes, pds.bus, pds.lines
"""
# idx = np.where(data['type'] == element_type, 1, 0)
# print(idx)
idx = np.where(data.index.isin(category_data.index.to_list()), 1, 0)
if inverse:
# to get a matrix of all types but the input one
idx = np.logical_not(idx)
mat = idx * np.eye(len(data))
return mat
def get_dt_mat(n):
"""
generate a matrix for representing difference between following time steps
for example, change in tank volume: dv = v2 - v1
if dv is positive (v2 > v1) water flow from the network into the tank and vice versa
"""
mat = np.eye(n, k=0) - np.eye(n, k=1)
return mat
def linear_coefficients_from_two_points(p1, p2):
a = (p2[1] - p1[1]) / (p2[0] - p1[0])
b = p1[1] - a * p1[0]
return a, b
def normalize_mat(mat):
return (mat - np.min(mat)) / (np.max(mat) - np.min(mat))
def get_subsets_of_size(elements, subsets_size):
return [list(combination) for combination in itertools.combinations(elements, subsets_size)]
def get_subsets_of_max_size(elements, max_subset_size, include_empty=False):
"""
Generate all possible combinations of elements in a list,
with each combination's size limited to max_subset_size or less.
Parameters:
elements (list): A list of elements for which the combinations are to be generated.
max_subset_size (int): The maximum size of the subset combinations to be included in the output.
include_empty (bool): Add an empty set or not
Returns:
list of lists: A list where each sublist is a unique combination of elements from the input list,
with each sublist's size being max_subset_size or smaller, including the empty combination.
"""
if include_empty or max_subset_size == 0:
all_combinations = [[]]
else:
all_combinations = []
for r in range(1, min(len(elements), max_subset_size) + 1):
all_combinations.extend(itertools.combinations(elements, r))
return [list(comb) for comb in all_combinations]
def adjust_time_window(df, start_time, duration, time_axis=0):
if time_axis == 1:
df = df.T
end_time = start_time + duration
total_rows_needed = start_time + duration
extra_rows_needed = max(0, total_rows_needed - len(df))
initial_part = df.iloc[start_time:min(len(df), start_time + duration)]
repeated_part = pd.concat([df] * ((extra_rows_needed // len(df)) + 2), ignore_index=True).iloc[:extra_rows_needed]
adjusted_df = pd.concat([initial_part, repeated_part], ignore_index=True)
adjusted_df.index = np.arange(start=start_time, stop=end_time)
if time_axis == 1:
adjusted_df = adjusted_df.T
return adjusted_df
GRB_STATUS = {
1: 'LOADED',
2: 'OPTIMAL',
3: 'INFEASIBLE',
4: 'INF_OR_UNBD',
5: 'UNBOUNDED',
6: 'CUTOFF',
7: 'ITERATION_LIMIT',
8: 'NODE_LIMIT',
9: 'TIME_LIMIT',
10: 'SOLUTION_LIMIT',
11: 'INTERRUPTED',
12: 'NUMERIC',
13: 'SUBOPTIMAL',
14: 'INPROGRESS',
15: 'USER_OBJ_LIMIT'
}