This repository has been archived by the owner on Jan 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy path_import.py
185 lines (140 loc) · 5.55 KB
/
_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# vim: set fileencoding=utf-8 :
from datetime import timedelta, datetime
import re
import sqlalchemy
from geopy.geocoders import Nominatim
from filters import radian_to_degree
from model import Sample
def parse_samples(samples, move):
for sample_node in samples:
sample = Sample()
sample.move = move
for child in sample_node.iterchildren():
tag = normalize_tag(child.tag)
value = child.text
if tag == 'events':
sample.events = parse_json(child)
elif tag == 'satellites':
sample.satellites = parse_json(child)
elif tag == 'apps_data':
sample.apps_data = parse_json(child)
else:
set_attr(sample, tag, value)
yield sample
def postprocess_move(move):
if move.public is None:
move.public = False
gps_samples = [sample for sample in move.samples if sample.sample_type and sample.sample_type.startswith('gps-')]
if gps_samples:
first_sample = gps_samples[0]
latitude = first_sample.latitude
longitude = first_sample.longitude
if latitude and longitude:
geo_locator = Nominatim(user_agent='openmoves.net')
location = geo_locator.reverse("%f, %f" % (radian_to_degree(latitude), radian_to_degree(longitude)), timeout=60)
move.location_address = location.address
move.location_raw = location.raw
def normalize_move(move):
move.import_date_time = datetime.now()
if move.ascent_time.total_seconds() == 0:
assert move.ascent == 0
move.ascent = None
if move.descent_time.total_seconds() == 0:
assert float(move.descent) == 0
move.descent = None
if move.recovery_time.total_seconds() == 0:
move.recovery_time = None
if move.activity == 'Outdoor swimmin':
move.activity = 'Outdoor swimming'
def remove_namespace(tag, ns='http://www.suunto.com/schemas/sml'):
if tag.startswith('{'):
ns = "{%s}" % ns
assert tag.startswith(ns), "illegal tag: '%s'" % tag
return tag[len(ns):]
else:
return tag
normalized_tags_cache = {}
def normalize_tag(tag):
if tag in normalized_tags_cache:
return normalized_tags_cache[tag]
normalized_tag = remove_namespace(tag)
# http://stackoverflow.com/a/1176023/4308
normalized_tag = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', normalized_tag)
normalized_tag = re.sub('([a-z0-9])([A-Z])', r'\1_\2', normalized_tag)
normalized_tag = normalized_tag.lower()
normalized_tags_cache[tag] = normalized_tag
return normalized_tag
def _convert_attr(column_type, value, attr, message):
if column_type == sqlalchemy.sql.sqltypes.Float:
return float(value)
elif column_type == sqlalchemy.sql.sqltypes.Interval:
seconds = float(value)
return timedelta(seconds=seconds)
elif column_type == sqlalchemy.sql.sqltypes.Integer:
if value == '0':
return 0
else:
try:
if value.startswith('0x'):
return int(value, 16)
else:
return int(value, 10)
except ValueError:
raise ValueError("failed to parse %s: %s" % (attr, value))
elif column_type == sqlalchemy.sql.sqltypes.String:
return value
elif column_type == sqlalchemy.sql.sqltypes.DateTime:
date, time = value.split('T')
year, month, day = date.split('-')
if time[-1] == 'Z':
time = time[:-1]
hour, minute, seconds = time.split(':')
seconds = float(seconds)
second = int(seconds)
microsecond = int((seconds - second) * (10 ** 6))
datetime_value = datetime(year=int(year),
month=int(month),
day=int(day),
hour=int(hour),
minute=int(minute),
second=second,
microsecond=microsecond)
return datetime_value
else:
raise Exception("Unknown column type: %s for attribute '%s'" % (column_type, attr))
def set_attr(move, attr, value):
prop = getattr(type(move), attr).property
assert len(prop.columns) == 1
value = _convert_attr(type(prop.columns[0].type), value, attr, "illegal value '%s' for '%s'" % (value, attr))
setattr(move, attr, value)
def add_children(move, element):
for child in element.iterchildren():
tag = normalize_tag(child.tag)
if tag in ('speed', 'hr', 'cadence', 'temperature', 'altitude'):
for sub_child in child.iterchildren():
sub_tag = tag + '_' + normalize_tag(sub_child.tag)
set_attr(move, sub_tag, value=sub_child.text)
else:
set_attr(move, tag, value=child.text)
def _parse_recursive(node):
if node.countchildren() > 0:
events = {}
for child in node.iterchildren():
tag = remove_namespace(child.tag)
if len(tag) > 2 and tag.upper() != tag:
tag = tag[0].lower() + tag[1:]
data = _parse_recursive(child)
if tag in events:
if not isinstance(events[tag], list):
events[tag] = [events[tag]]
events[tag].append(data)
else:
events[tag] = data
return events
else:
if len(node.text) > 0:
return node.text
else:
return None
def parse_json(node):
return _parse_recursive(node)