forked from faif/python-patterns
-
Notifications
You must be signed in to change notification settings - Fork 1
/
hsm.py
177 lines (127 loc) · 5.07 KB
/
hsm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""
Implementation of the HSM (hierarchical state machine) or
NFSM (nested finite state machine) C++ example from
http://www.eventhelix.com/RealtimeMantra/HierarchicalStateMachine.htm#.VwqLVEL950w
in Python
- single source 'message type' for state transition changes
- message type considered, messages (comment) not considered to avoid complexity
"""
class UnsupportedMessageType(BaseException):
pass
class UnsupportedState(BaseException):
pass
class UnsupportedTransition(BaseException):
pass
class HierachicalStateMachine:
def __init__(self):
self._active_state = Active(self) # Unit.Inservice.Active()
self._standby_state = Standby(self) # Unit.Inservice.Standby()
self._suspect_state = Suspect(self) # Unit.OutOfService.Suspect()
self._failed_state = Failed(self) # Unit.OutOfService.Failed()
self._current_state = self._standby_state
self.states = {
"active": self._active_state,
"standby": self._standby_state,
"suspect": self._suspect_state,
"failed": self._failed_state,
}
self.message_types = {
"fault trigger": self._current_state.on_fault_trigger,
"switchover": self._current_state.on_switchover,
"diagnostics passed": self._current_state.on_diagnostics_passed,
"diagnostics failed": self._current_state.on_diagnostics_failed,
"operator inservice": self._current_state.on_operator_inservice,
}
def _next_state(self, state):
try:
self._current_state = self.states[state]
except KeyError:
raise UnsupportedState
def _send_diagnostics_request(self):
return "send diagnostic request"
def _raise_alarm(self):
return "raise alarm"
def _clear_alarm(self):
return "clear alarm"
def _perform_switchover(self):
return "perform switchover"
def _send_switchover_response(self):
return "send switchover response"
def _send_operator_inservice_response(self):
return "send operator inservice response"
def _send_diagnostics_failure_report(self):
return "send diagnostics failure report"
def _send_diagnostics_pass_report(self):
return "send diagnostics pass report"
def _abort_diagnostics(self):
return "abort diagnostics"
def _check_mate_status(self):
return "check mate status"
def on_message(self, message_type): # message ignored
if message_type in self.message_types.keys():
self.message_types[message_type]()
else:
raise UnsupportedMessageType
class Unit:
def __init__(self, HierachicalStateMachine):
self.hsm = HierachicalStateMachine
def on_switchover(self):
raise UnsupportedTransition
def on_fault_trigger(self):
raise UnsupportedTransition
def on_diagnostics_failed(self):
raise UnsupportedTransition
def on_diagnostics_passed(self):
raise UnsupportedTransition
def on_operator_inservice(self):
raise UnsupportedTransition
class Inservice(Unit):
def __init__(self, HierachicalStateMachine):
self._hsm = HierachicalStateMachine
def on_fault_trigger(self):
self._hsm._next_state("suspect")
self._hsm._send_diagnostics_request()
self._hsm._raise_alarm()
def on_switchover(self):
self._hsm._perform_switchover()
self._hsm._check_mate_status()
self._hsm._send_switchover_response()
class Active(Inservice):
def __init__(self, HierachicalStateMachine):
self._hsm = HierachicalStateMachine
def on_fault_trigger(self):
super().perform_switchover()
super().on_fault_trigger()
def on_switchover(self):
self._hsm.on_switchover() # message ignored
self._hsm.next_state("standby")
class Standby(Inservice):
def __init__(self, HierachicalStateMachine):
self._hsm = HierachicalStateMachine
def on_switchover(self):
super().on_switchover() # message ignored
self._hsm._next_state("active")
class OutOfService(Unit):
def __init__(self, HierachicalStateMachine):
self._hsm = HierachicalStateMachine
def on_operator_inservice(self):
self._hsm.on_switchover() # message ignored
self._hsm.send_operator_inservice_response()
self._hsm.next_state("suspect")
class Suspect(OutOfService):
def __init__(self, HierachicalStateMachine):
self._hsm = HierachicalStateMachine
def on_diagnostics_failed(self):
super().send_diagnostics_failure_report()
super().next_state("failed")
def on_diagnostics_passed(self):
super().send_diagnostics_pass_report()
super().clear_alarm() # loss of redundancy alarm
super().next_state("standby")
def on_operator_inservice(self):
super().abort_diagnostics()
super().on_operator_inservice() # message ignored
class Failed(OutOfService):
"""No need to override any method."""
def __init__(self, HierachicalStateMachine):
self._hsm = HierachicalStateMachine