-
Notifications
You must be signed in to change notification settings - Fork 0
/
7_schema_demo_analytics.py
executable file
·86 lines (75 loc) · 2.31 KB
/
7_schema_demo_analytics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3
import datetime
import getopt
import json
import random
import sys
from bson import json_util
from pymongo import UpdateOne
from shared.schema_demo_base import SchemaDemoBase
class SchemaDemo(SchemaDemoBase):
def find_pressure_average(self, collection):
pipeline = [
{
'$unwind': {
'path': '$measurements'
}
}, {
'$group': {
'_id': '$sensor_id',
'avg_pressure': {
'$avg': '$measurements.pressure'
}
}
}
]
results = list(collection.aggregate(pipeline))
return results[0]['avg_pressure']
def find_pressure_spike(self, collection, value):
pipeline = [
{
'$match': {
'measurements.pressure': {
'$gte': value
}
}
}, {
'$project': {
'measurement': {
'$filter': {
'input': '$measurements',
'as': 'item',
'cond': {
'$gte': [
'$$item.pressure', value
]
}
}
},
'sensor_id': 1,
'count': 1
}
}, {
'$unwind': {
'path': '$measurement'
}
}, {
'$sort': {
'measurement.pressure': -1
}
}, {
'$limit': 3
}
]
results = collection.aggregate(pipeline)
for doc in results:
print(json.dumps(doc, default=json_util.default, indent=4))
if __name__ == "__main__":
try:
test = SchemaDemo()
collection = test.init(sys.argv, 'schema_demo5', drop_collection=False)
if collection:
avg_pressure = test.find_pressure_average(collection)
test.find_pressure_spike(collection, avg_pressure )
except KeyboardInterrupt:
print('Exiting')