-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathCompletePolicyGraph.py
More file actions
150 lines (118 loc) · 5.97 KB
/
CompletePolicyGraph.py
File metadata and controls
150 lines (118 loc) · 5.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from Code.Utils.utils import get_assigned_color, Actions
import networkx as nx
import numpy as np
from Explainability.PolicyGraph import PolicyGraph
class CompletePolicyGraph(PolicyGraph):
""" Record all the transitions.
This algorithm builds a multi-directed graph. For each state, it takes all the agent interactions.
Therefore, we add them all to our graph. This means that for each node, we have multiple possible actions.
We think this could be an interesting approach since we are maintaining stochasticity. Probably this algorithm
can work better in more complex layouts where decision-making is more relative.
"""
MDP_MODELS_FOLDER = 'Code/MDP_Models/'
def __init__(self, ego_file, alt_file, discretizer, name, layout):
"""
Constructor.
params (Namespace): Parameters of the environment.
discretizer (Discretizer): Object that converts OvercookedStates into discrete predicates.
pg: Graph
frequencies: Saves how many times (state, action, next state) is reached.
"""
super().__init__(ego_file, alt_file, discretizer, name, layout)
self.pg = nx.MultiDiGraph(name='MDP')
# MDP Generation
def build_mdp(self, verbose=False):
"""
Takes frequencies and builds the Markov Decision Graph.
In this case, we add al the possible edges.
Then we add to the graph all the edges (node, action, next_node)
In this algorithm, each node can have multiple edges with different actions
"""
self.pg = nx.MultiDiGraph(name='MDP')
for state, actions in self.frequencies.items():
# Times per action
sum_partial = {action: sum(list(next_states.values())) for action, next_states in
self.frequencies[state].items()}
# Total times
sum_total = sum(list(sum_partial.values()))
# Percentage per action
prob_sub_partial = self.frequencies[state].copy()
prob_sub_partial = {action: {next_s: freq / sum_total for next_s, freq in prob_sub_partial[action].items()}
for action, _ in prob_sub_partial.items()}
new_edges = []
for action, next_states in prob_sub_partial.items():
new_edge = list(zip([state] * len(next_states.values()),
[action] * len(next_states.values()),
list(next_states.keys()),
list(next_states.values()),
)
)
new_edges.append(new_edge)
for edge_list_by_actions in new_edges:
for u, a, v, p in edge_list_by_actions:
# label: In order to show the edge label with pyvis
# width: In order to show the edge width with pyvis
color = get_assigned_color(a)
self.update_edge(u, v, int(a), p, color)
self.normalize_node_weights(state)
nx.set_node_attributes(self.pg,
{node: self.discretizer.get_predicate_label(node)
for node, freq in self.frequencies.items()},
name='label')
def select_action_using_mdp(self, predicate, verbose):
"""
Given a predicate, goes to the MDP and selects the corresponding action.
Since each node can have multiple possible actions, we can face this problem in different ways:
- Take the most probable action
- Take one action using its probability distribution
Here we used the second one, but other options could work
:param predicate: Existent or non-existent state
:param verbose: Prints additional information
:return: Action
"""
# Predicate does not exist
if predicate not in self.pg.nodes:
self.pg_metrics['new_state'] += 1
nearest_predicate = self.get_nearest_predicate(predicate, verbose=verbose)
possible_actions = self.get_possible_actions(nearest_predicate)
# Probability distribution
p = [data[1] for data in possible_actions]
a = [data[0].value for data in possible_actions]
sum_prob = sum(p)
assert (sum_prob > 0.999 and sum_prob < 1.001), f"{sum(p)} - {p}"
# Take one action with a given Probability distribution
p = np.array(p)
p /= p.sum()
rand_action = np.random.choice(a, p=p)
rand_action = Actions(rand_action)
return rand_action
def get_most_probable_option(self, predicate, verbose):
"""
Given a predicate, goes to the MDP and selects the corresponding action.
Since each node can have multiple possible actions, we can face this problem in different ways:
- Take the most probable action
- Take one action using its probability distribution
Here we used the second one, but other options could work
:param predicate: Existent or non-existent state
:param verbose: Prints additional information
:return: Action
"""
nearest_predicate = self.get_nearest_predicate(predicate, verbose=verbose)
possible_actions = self.get_possible_actions(nearest_predicate)
possible_actions = sorted(possible_actions, key=lambda x: x[1])
return possible_actions[-1][0]
def update_edge(self, u, v, a, w, color):
# Edges
edges = self.pg.get_edge_data(u, v)
if edges is not None:
edges = list(edges.values())
for edge in edges:
# Exists the edge
if edge['action'] == a:
# Compute the new weight
# edge['weight'] = (alpha * w) + ((1 - alpha) * edge['weight'])
edge['weight'] = w
return True
# Edge does not exist
self.pg.add_edge(u, v, action=a, weight=w, color=color)
return True