-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathelastic.py
66 lines (61 loc) · 1.97 KB
/
elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sys
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
import json
class ElasticImporter:
def __init__(self,path):
self.es = Elasticsearch()
self.path = path
self.index = "tweets"
self.doc_type = "tweet"
self.bulk = 100000
self.importTweets()
"""Imports all the tweets in the specified file. Usign an
infinte loop so that we can bulk 25k tweets at the time useful as we do
not need to store the whole file as a list, thus not keep eating RAM."""
def importTweets(self):
importActions = []
counter = 0
acc = 0
with open(self.path, "r") as file:
while True:
line = file.readline()
if line == "":
helpers.bulk(self.es, importActions) #bulk import the tweets
break
doc = json.loads(line)
date = doc["date"]
#Reformats the date to an appropriate format for elasticsearch.
doc["date"] = datetime.strptime(date,
"%a %b %d %H:%M:%S %z %Y").isoformat()
action = {
'_op_type': 'create',
"_index": self.index,
"_type": self.doc_type,
"_source": doc
}
importActions.append(action)
action = None
counter +=1
if counter == self.bulk:
acc +=self.bulk
print("I have now processed:" + str(len(importActions)) + " more total:" + str(acc))
helpers.bulk(self.es, importActions) #bulk import the tweets
counter = 0 #reset counter
importActions = [] # reset list
def checkImport(self):
self.es.indices.refresh(index=self.index)
res = self.es.search(index=self.index, body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total'])
for hit in res['hits']['hits']:
print("%(user_id)s %(tweet_id)s: %(text)s" % hit["_source"])
def main(path):
print("Will import " +path + " into elasticsearch")
importer = ElasticImporter(path)
importer.checkImport()
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Supplie exactly one argument for the filepath to the datafile.")
else:
main(str(sys.argv[1]))