This repository has been archived by the owner on Sep 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathm_daemon.py
440 lines (358 loc) · 12.3 KB
/
m_daemon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
import asyncore
import re as re
import time
from datetime import datetime
import numpy as np
import utils.socket_subs as socket_subs
import utils.visa_subs as visa_subs
class MControl:
"""
Initialization call, initialize visas for the Mercury IPS and perform some startup
queries on the instrument
server, server always runs at 18861
Important parameters
field
heater
a_to_b (amps to tesla)
lock - lock the deamon from performing actions, typically if the heater has just been
switched
The target current either as part of a sweep or going to a fixed value
Mode: Sweep or Set (including set to zero)
TODO: modify codes to allow X, Y Z axes
"""
def __init__(self):
# Connect visa to the magnet
self.visa = visa_subs.initialize_serial("ASRL11::INSTR")
# Add Timeout to 200s
self.visa.timeout = 200000
# Open the socket
address = ('localhost', 18861)
self.server = socket_subs.SockServer(address)
# Define some important parameters for the magnet
self.field = 0.0
self.source_current = 0.0
self.heater = False
self.magnet_current = 0.0
self.a_to_b = 0.0
self.rate = 2.19
self.max_rate = 2.19
self.current_limit = 0.0
# Set up the lock for the switch heater
self.lock = False
self.lock_time = 0.0
# The magnet actions are defined by the following parameters.
# The daemon tries to reach the target field and then put the heater into the target state
self.target_field = 0.0
self.target_heater = False
self.sweep_now = False
self.ready = 1 # ready message which is also broadcast to the listener
return
def magnet_read_numeric(self, command):
"""Function to read one of the numeric signals"""
# Form the query string (Now only for GRPZ)
query = "".join(("READ:DEV:GRPZ:PSU:SIG:", command))
reply = self.visa.query(query)
# Find the useful part of the response
answer = str.rsplit(reply, ":", 1)[1]
# Some regex to get rid of the appended units
answer = re.split("[a-zA-Z]", answer, 1)[0]
answer = float(answer)
return answer
def magnet_read_field(self):
"""Function to read the field in Tesla specifically"""
# Form the query string (Now only for GRPZ)
if self.heater:
query = "READ:DEV:GRPZ:PSU:SIG:FLD"
else:
# For some reason the command PFLD doesn't work
query = "READ:DEV:GRPZ:PSU:SIG:PCUR"
reply = self.visa.query(query)
# Find the useful part of the response
answer = str.rsplit(reply, ":", 1)[1]
# Some regex to get rid of the appended units
answer = re.split("[a-zA-Z]", answer, 1)[0]
answer = float(answer)
if self.heater:
self.source_current = answer * self.a_to_b
self.magnet_current = self.source_current
else:
self.magnet_current = answer
answer = answer / self.a_to_b
self.field = answer
return
def magnet_read_conf_numeric(self, command):
"""Read one of the numeric configs"""
# Form the query string (Now only for GRPZ)
query = "".join(("READ:DEV:GRPZ:PSU:", command))
reply = self.visa.query(query)
# Find the useful part of the response
answer = str.rsplit(reply, ":", 1)[1]
# Some regex to get rid of the appended units
answer = re.split("[a-zA-Z]", answer, 1)[0]
answer = float(answer)
return answer
def magnet_set_numeric(self, command, value):
"""Function to set one of the numeric signals"""
# Form the query string (Now only for GRPZ)
write_command = "SET:DEV:GRPZ:PSU:SIG:%s:%.4f" % (command, value)
reply = self.visa.query(write_command)
answer = str.rsplit(reply, ":", 1)[1]
if answer == "VALID":
valid = 1
elif answer == "INVALID":
valid = 0
else:
valid = -1
return valid
def magnet_read_heater(self):
"""Function to read the switch heater state returns boolean"""
reply = self.visa.query("READ:DEV:GRPZ:PSU:SIG:SWHT")
answer = str.rsplit(reply, ":", 1)[1]
if answer == "ON":
valid = 1
self.heater = True
elif answer == "OFF":
valid = 0
self.heater = False
else:
valid = -1
return valid
def magnet_set_heater(self, state):
"""Turn the switch heater ON (1) or OFF (0)"""
self.magnet_set_action("HOLD")
heater_before = self.heater
if state:
reply = self.visa.query("SET:DEV:GRPZ:PSU:SIG:SWHT:ON")
else:
reply = self.visa.query("SET:DEV:GRPZ:PSU:SIG:SWHT:OFF")
answer = str.rsplit(reply, ":", 1)[1]
valid = 0
if answer == "VALID":
valid = 1
elif answer == "INVALID":
valid = 0
time.sleep(5.)
self.magnet_read_heater()
heater_after = self.heater
if heater_after != heater_before:
print("heater switched ... locking for 2 minutes...")
self.lock = True
self.lock_time = datetime.now()
return valid
def magnet_read_action(self):
""" Read the current magnet action e.g. HOLD, RTOZ etc."""
reply = self.visa.query("READ:DEV:GRPZ:PSU:ACTN")
answer = str.rsplit(reply, ":", 1)[1]
return answer
def magnet_set_action(self, command):
"""Set the action for the magnet"""
reply = self.visa.query("".join(("SET:DEV:GRPZ:PSU:ACTN:", command)))
answer = str.rsplit(reply, ":", 1)[1]
if answer == "VALID":
valid = 1
elif answer == "INVALID":
valid = 0
else:
valid = -1
return valid
def magnet_check_switchable(self):
"""Check if it is safe to switch the switch heater"""
self.magnet_read_heater()
self.source_current = self.magnet_read_numeric("CURR")
self.magnet_current = self.magnet_read_numeric("PCUR")
switchable = False
if self.heater:
switchable = True
elif abs(self.source_current - self.magnet_current) <= 0.1:
switchable = True
elif self.heater == 0 and abs(self.source_current - self.magnet_current) >= 0.1:
switchable = False
action = self.magnet_read_action()
if action == "RTOZ" or action == "RTOS":
switchable = False
return switchable
def magnet_on_start_up(self):
"""On start get parameters"""
# Check the heater
self.magnet_read_heater()
self.a_to_b = self.magnet_read_conf_numeric("ATOB")
# Take care of the field sourcecurrent and magnetcurrent
self.magnet_read_field()
self.current_limit = self.magnet_read_conf_numeric("CLIM")
self.target_field = self.field
self.target_heater = self.heater
if self.heater:
heater_string = "ON"
else:
heater_string = "OFF"
print(
f"Connected to magnet... heater is {heater_string}, field is {self.field:.4f}, "
f"Magnet conversion = {self.a_to_b:.4f} A/T, Maximum current = {self.current_limit:.3f}"
)
return
def set_source(self, new_set):
"""Set the leads current, ignore the switch heater state, busy etc"""
if abs(new_set) <= self.current_limit:
c_set = new_set
else:
c_set = np.copysign(self.current_limit, new_set)
self.magnet_set_numeric("CSET", c_set)
# If the heater is on set the rate
if self.heater:
if self.rate >= self.max_rate:
self.rate = self.max_rate
self.magnet_set_numeric("RCST", self.rate)
set_rate = self.magnet_read_numeric("RCST")
self.magnet_set_action("RTOS")
print(f"Ramping source to {c_set:.4f} A at {set_rate:.4f} A/m\n")
return
def query_at_target(self):
if abs(self.target_field) < 1.0:
if abs(self.field-self.target_field) < 0.0003:
at_target = True
else:
at_target = False
else:
if abs((self.field-self.target_field)/self.target_field) <= 0.00015:
at_target = True
else:
at_target = False
return at_target
def update_ready(self):
if self.query_at_target() and (self.heater == self.target_heater):
# The system is at target and ready
self.ready = 1
else:
# Idle
self.ready = 0
return
def read_msg(self, msg):
"""Interpret a message from the socket
There are two possible actionable calls to the daemon
1. "SET" go to set point
2. "SWP" sweep from the current field to a target
"""
msg = msg.decode() # change in python 3
msg = msg.split(" ")
if msg[0] == "SET":
# Set message has form "SET target_field target_heater"
try:
new_field = float(msg[1])
new_heater = int(msg[2])
new_heater = bool(new_heater)
if (new_field != self.target_field) or (new_heater != self.target_heater):
self.target_field = new_field
self.target_heater = new_heater
self.rate = self.max_rate
self.update_ready()
if not self.ready:
print(f"Got new set point from socket {self.target_field:.4f} T")
except:
pass
if msg[0] == "SWP":
# Message has form "SWP target_field rate target_heater"
# print msg
try:
new_field = float(msg[1])
new_heater = int(msg[3])
new_heater = bool(new_heater)
self.rate = float(msg[2]) * self.a_to_b
if (new_field != self.target_field) or (new_heater != self.target_heater):
self.target_field = new_field
self.target_heater = new_heater
self.update_ready()
if not self.ready:
print(
f"Got new sweep point from socket to {self.target_field:.4f} T"
f" at {self.rate/self.a_to_b:.4f} T/min"
)
except:
pass
return
if __name__ == '__main__':
# Initialize a daemon instance and runs startup codes
control = MControl()
control.magnet_on_start_up()
# A flag to control the source behavior
source_flag = False
while 1:
# Read the field and update the ready message
control.magnet_read_field()
# Push the reading to clients
for j in control.server.handlers:
j.to_send = f",{control.field:.5f} {control.ready:d}".encode()
socket_msg = j.received_data
if socket_msg and socket_msg != "-":
control.read_msg(socket_msg)
asyncore.loop(count=1, timeout=0.001)
""" Now we should do stuff depending on the socket and what we
were doing before reading the socket
In order of precedence
1. We are locked, waiting for the switch heater -> delay any actions
2. Go to the target field
3. Go to the target heater
4. ... just chill out!
"""
if control.lock:
# Check if we can release the lock
wait = datetime.now() - control.lock_time
if wait.seconds >= 120.0:
# Unlock
control.lock = False
print("Unlocking...")
if not control.lock and not control.ready:
""" The magnet is not locked and not ready
We now try to go to the target_field and
set the target_heater
"""
if not control.query_at_target():
# System is not at the target field
if not control.heater:
# The heater is not on
if control.magnet_check_switchable():
# The switch heater can be switched ON --> so switch it ON
control.magnet_set_heater(True)
# this will set the lock so we need to get out of the loop without doing anything else
else:
# The switch heater is not on
action = control.magnet_read_action()
if action != "RTOS":
# The source is not ramping --> Ramp it to the magnet current so it can be switched
control.set_source(control.magnet_current)
else:
# The heater is on --> so go to the target
action = control.magnet_read_action()
set_current = control.magnet_read_numeric("CSET")
if action != "RTOS" or abs(set_current - control.target_field * control.a_to_b) > 0.005:
# The source is not ramping --> Ramp it to the magnet current so it can be switched
target_current = control.target_field * control.a_to_b
control.set_source(target_current)
elif control.heater != control.target_heater:
""" The magnet is at the target field but the heater is not in the target state
There are two possibilities
1. The heater is now ON --> turn it off and ramp the source down
2. The heater is OFF --> Set the source to magnet current and turn it on
"""
if control.heater:
# The heater is on
if control.magnet_check_switchable():
control.magnet_set_heater(False)
# Set the source flag to tell the source to ramp to zero
source_flag = True
else:
# The heater is not on
if control.magnet_check_switchable():
# The switch heater can be switched ON --> so switch it ON
control.magnet_set_heater(True)
# this will set the lock so we need to get out of the loop without doing anything else
else:
# The switch heater is not on
action = control.magnet_read_action()
if action != "RTOS":
# The source is not ramping --> Ramp it to the magnet current so it can be switched
control.set_source(control.magnet_current)
if not control.lock and source_flag:
# The source_flag has been set ramp the source to zero and unset the flag
control.magnet_set_action("RTOZ")
source_flag = False
time.sleep(0.4)