-
Notifications
You must be signed in to change notification settings - Fork 0
/
3_schema_demo_upsert_time_based_bucket.py
executable file
·51 lines (46 loc) · 1.76 KB
/
3_schema_demo_upsert_time_based_bucket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
import random
import sys
import getopt
import datetime
from pymongo import UpdateOne
from shared.schema_demo_base import SchemaDemoBase
class SchemaDemo(SchemaDemoBase):
def create_sub_doc(self, time, delta):
doc = {
'timestamp': time + datetime.timedelta(minutes=delta),
'temperature': random.randint(62, 66),
'moisture': random.randint(500, 600)
}
return doc
def insert_docs(self, collection):
n = self.options['n']
b = self.options['b']
start = self.options['start-date']
end = start + datetime.timedelta(minutes=60)
for i in range(0, n, b):
for j in range(b):
doc = self.create_sub_doc(start, j)
if self.batch:
operation = UpdateOne( {'sensor_id': 12345, 'date1': start, 'date2': end},
{'$push': {'measurements': doc}},
upsert=True
)
self.add_to_batch(collection, operation)
else:
collection.update_one(
{'sensor_id': 12345, 'date1': start, 'date2': end},
{'$push': {'measurements': doc}},
upsert = True)
self.inc_doc_count()
start = end
end = start + datetime.timedelta(minutes=60)
self.test_done(collection)
if __name__ == "__main__":
try:
test = SchemaDemo()
collection = test.init(sys.argv, 'schema_demo3', drop_collection=True)
if collection:
test.insert_docs(collection)
except KeyboardInterrupt:
print('Exiting')