-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtornado_run_streaming_and_save.py
51 lines (38 loc) · 1.44 KB
/
tornado_run_streaming_and_save.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from tornado import ioloop, gen, httpclient
import json
import motor
class DataReciever(object):
def __init__(self):
self.partial_data = None
self.partial_chars = None
self.client = motor.motor_tornado.MotorClient()
self.db = self.client['test_database']
# def data_from_line(self, line):
def my_callback(self, result, error):
print('result %s' % repr(result.inserted_id))
def streaming_callback(self, input):
data = input.decode("utf-8")
lines = data.splitlines()
for line in lines:
if self.partial_data is not None:
line = self.partial_data + line
self.partial_data = None
if len(line) > 0 and len(line) < 100 and "data:".startswith(line[:4]):
self.partial_data = line
if line.startswith("data:"):
try:
data = json.loads(line[5:])
self.db.test_collection.insert_one(data, callback=self.my_callback)
except (ValueError, IndexError):
self.partial_data = line
@gen.coroutine
def main():
http_client = httpclient.AsyncHTTPClient()
dr = DataReciever()
yield http_client.fetch(
'https://stream.wikimedia.org/v2/stream/recentchange',
streaming_callback=dr.streaming_callback,
request_timeout=3600
)
if __name__ == '__main__':
ioloop.IOLoop.instance().run_sync(main)