-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata.py
executable file
·211 lines (158 loc) · 6.22 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# -*- encoding: utf-8 -*-
import estructures
import comunicacio
"""
Mòdul data
==========
S'encarrega de la traducció de paquets a dades i de dades a paquets.
"""
# Conversió de dades
def data2int(v, len_tipus_param):
"""
Conversor entre valors i representació hexadecimal del valor en un
string. Si li passem un valor ens retorna el la seva representació
hexadecimal en un string que te en compte la llargada del
tipus de dada.
:param int v: Valor que volem convertir
:param int len_v_type: Llargada del tipus de dada
:rtype: str corresponent a la conversió de valor
>>> data2int(-1, 2) == '\xff\xff'
True
>>> data2int(-1, 4) == '\xff\xff\xff\xff'
True
>>> data2int(0xa0, 1) == '\xa0'
True
"""
str_value = ""
for i in range(len_tipus_param):
mascara = 0xff << (8*i)
byte_value = (v & mascara) >> (8*i)
str_value += chr(byte_value)
return str_value
def str2int_unsigned(str_valor_param, len_tipus_param):
"""
Conversor entre cadenes de caràcters i enters. Prenent cada caràcter com un
byte. La cadena de caràcters representa un enter sense signe.
:param int str_valor_param: Cadena de caràcters que codifica un enter sense signe
:param int len_tipus_param: Llargada del tipus de paràmetre
:rtype: int
>>> str2int_unsigned('\xff',1)
255
>>> str2int_unsigned('\xff\xff',2)
65535
>>> str2int_unsigned('\xaa',1)
170
"""
acomulador = 0
byte_index = range(len_tipus_param)
byte_index.reverse()
for i in byte_index:
acomulador += ord(str_valor_param[i]) << (8 * i)
return acomulador
def str2int_signed(str_valor_param, len_tipus_param):
"""
Conversor entre cadenes de caràcters i enters. Prenent cada caràcter com un
byte. La cadena de caràcters representa un enter amb signe.
:param int str_valor_param: Cadena de caràcters que codifica un enter amb signe
:param int len_tipus_param: Llargada del tipus de paràmetre
:rtype: int
>>> str2int_signed('\xff',1)
-1
>>> str2int_signed('\xff\xff',1)
-1
>>> str2int_signed('\xaa',1)
-86
"""
signed_mask = 0x80 << (8 * (len_tipus_param - 1))
unsigned_mask = signed_mask - 1
unsigned = str2int_unsigned(str_valor_param, len_tipus_param)
return (unsigned & unsigned_mask) - (unsigned & signed_mask)
# Empaquetar/Desempaquetar
def dades2paquet(tx_data):
"""
Passem un diccionari que conte 'tipus' i 'estructura'. La clau
'tipus' te associat el tipus de paquet que es i la clau
'estructura' te associat un diccionari que conte el nom dels
paràmetres com a clau i el valor dels paràmetres com a valor.
:param dict tx_data: Diccionari que conte l'identificador del tipus de paquet i l'estructura de les dades
:rtype: str paquet corresponent a les dades de l'estructura
>>> dades2paquet({'tipus': 'di', 'estructura': {'pitch': 0x1234, 'roll': 0x5678, 'yaw': 0x1234, 'thrust': 4321, 'ctrl': 0xB1B2, 'checksum': 0xAAAA}}) == 'di\x34\x12\x78\x56\x34\x12\x21\x43\xb1\xb2\xaa\xaa'
True
>>> dades2paquet({'tipus': '','estructura': ''})
"""
paquet = ""
tipus = tx_data['tipus']
estructura = tx_data['estructura']
for i in range(len(estructures.tx_structure[tipus][0])):
nom_param = estructures.tx_structure[tipus][0][i]
tipus_param = estructures.tx_structure[tipus][1][i]
valor_param = estructura[nom_param]
len_tipus_param = estructures.type_values[tipus_param]
str_valor_param = data2int(valor_param, len_tipus_param)
paquet += str_valor_param
return tipus+paquet
def paquet2dades(rx_paquet):
"""
Li passem un paquet i retorna l'identificador del paquet i un
diccionari amb les dades extretes del paquet.
:param dict tx_data: Diccionari que conte l'identificador del tipus de paquet i l'estructura de les dades
:rtype: str paquet corresponent a les dades de l'estructura
>>> paquet2dades('Ahsask')
{'estructura': {}, 'tipus': 'No struct'}
>>> paquet2dades('\x01\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff')
{'estructura': {'temp_ADC': 4294967295, 'temp_gyro': 65535, 'acc_z': -1, 'pressure': -1, 'gyro_z': -1, 'gyro_x': -1, 'gyro_y': -1, 'acc_x': -1, 'mag_y': -1, 'mag_x': -1, 'acc_y': -1, 'mag_z': -1}, 'tipus': 'IMU_RAWDATA'}
"""
codi_tipus = ord(rx_paquet[0])
dades = rx_paquet[1:]
if codi_tipus in estructures.descriptor2structure.keys():
tipus = estructures.descriptor2structure[codi_tipus]
estructura = {}
for i in range(len(estructures.rx_structure[tipus][0])):
nom_param = estructures.rx_structure[tipus][0][i]
tipus_param = estructures.rx_structure[tipus][1][i]
len_tipus_param = estructures.type_values[tipus_param]
str_valor_param = dades[:len_tipus_param]
dades = dades[len_tipus_param:]
if 'u' in tipus_param:
valor_param = str2int_unsigned(str_valor_param, len_tipus_param)
else:
valor_param = str2int_signed(str_valor_param, len_tipus_param)
estructura[nom_param] = valor_param
return {'tipus': tipus, 'estructura': estructura}
else:
return {'tipus': 'No struct', 'estructura': {}}
# Tx/Rx
def tx_data(tx_data):
"""
Empaqueta les dades i les envia
:param dict tx_data: Diccionari que conte l'identificador del tipus de paquet i l'estructura de les dades
"""
tx_paquet = dades2paquet(tx_data)
comunicacio.tx_paquet(tx_paquet)
def rx_data():
"""
Demana un paquet el transforma en dades i el retorna
Retorna un diccionari amb l'identificador del paquet i l'estructura de
dades rebuda
:rtype: dict
"""
rx_paquet = comunicacio.rx_paquet()
if rx_paquet:
return paquet2dades(rx_paquet)
else:
return {}
def init():
"""
Inicia el canal de comunicació
Retorna True si s'ha pogut iniciar el canal de comunicació amb èxit
:rtype: bool
"""
return comunicacio.init()
def close():
"""
Tenca el canal de comunicació
"""
comunicacio.close()
if __name__ == "__main__":
import doctest
doctest.testmod()