-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFitFile_Handler.py
77 lines (60 loc) · 2.5 KB
/
FitFile_Handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# -*- coding: utf-8 -*-
import fitparse
import pytz
import csv
import os
import datetime
# ----------------------------------------------------------------------------
class FitFile_handler(object):
"""
This class is build to manage all the operations possible with a .fit file
- load a file
- parse a file
- send a data to the db
"""
UTC = pytz.UTC
CST = pytz.timezone('US/Central')
# ------------------------------------------------------------------------
def __init__(self, parameters, logger, file):
self.parameters = parameters
self.logger = logger
self.file = file
self.filename = self.file[:-4].split(os.sep)[-1]
self.allowed_fields = ['timestamp','position_lat','position_long',
'enhanced_altitude', 'altitude','enhanced_speed',
'speed', 'heart_rate','cadence','fractional_cadence', 'power']
self.required_fields = ['timestamp', 'position_lat', 'position_long', 'altitude']
# ------------------------------------------------------------------------
def load_file(self):
self.fitfile = fitparse.FitFile(self.file, data_processor=fitparse.StandardUnitsDataProcessor())
# ------------------------------------------------------------------------
def parse(self):
messages = self.fitfile.messages
self.data = []
for m in messages:
skip=False
if not hasattr(m, 'fields'):
continue
fields = m.fields
#check for important data types
mdata = {}
for field in fields:
if field.name in self.allowed_fields:
mdata[field.name] = field.value
mdata['event_name'] = self.filename
for rf in self.required_fields:
if rf not in mdata:
skip=True
if not skip:
self.data.append(mdata)
# ------------------------------------------------------------------------
def to_csv(self):
"""
Legacy function
"""
with open(self.filename + '.csv', 'w') as f:
writer = csv.writer(f)
writer.writerow(self.allowed_fields)
for entry in self.data:
writer.writerow([ str(entry.get(k, '')) for k in self.allowed_fields])
self.logger.debug('wrote %s' % (self.filename + '.csv'))