-
Notifications
You must be signed in to change notification settings - Fork 0
/
push_to_es.py
114 lines (97 loc) · 3.77 KB
/
push_to_es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
from elasticsearch import Elasticsearch ,helpers
import os, uuid
import json
import time
def create_index(es_object, index_name='spot'):
created = False
# index settings
settings = {
"settings": {
"index.max_result_window":100000,
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"date":{"type":"date","format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd H:m:s||yyyy-MM-dd H:m:ss||yyyy-MM-dd H:mm:ss||yyyy-MM-dd"},
"time":{ "type": "text"},
"frame_id": {"type":"integer"},
"camera_id":{"type":"integer"},
"path":{"type":"text"},
"objects" : { "type":"nested",
"properties": {
"class": { "type": "text" ,"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
} },
"confidence": { "type": "float" },
"coordinates":{ "type": "nested" ,
"properties": { "x" :{"type":"float"},
"y" :{"type":"float"},
"w" :{"type":"float"},
"h" :{"type":"float"}
} }
}
}
}
}}
try:
print(es_object.indices.exists(index_name))
if not es_object.indices.exists(index_name):
# Ignore 400 means to ignore "Index Already Exist" error.
es_object.indices.create(index=index_name, body=settings)
print('Created Index')
created = True
except Exception as ex:
print(str(ex))
finally:
return created
def connect_elasticsearch():
_es = None
print("Connectig...")
_es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
if _es.ping():
print('Connected to Elasticsearch .')
else:
print('Awww it could not connect!')
return _es
def get_data_from_file( path):
file = open(path, encoding="utf8", errors='ignore')
#import pdb
#pdb.set_trace()
data = eval(file.read().splitlines(False)[0])
#content = file.read().splitlines(False)
#merge =""
#for i in content:
#merge+=i
#data = [json.loads(merge)]
#file.close()
#print(data)
return data
def bulk_json_data(json_path, _index, doc_type):
dirs = os.listdir(json_path)
for files in dirs:
json_list = get_data_from_file(path=os.path.join(json_path,files))
for doc in json_list:
# use a `yield` generator so that the data
# isn't loaded into memory
if '{"index"' not in doc:
yield {
"_index": _index,
"_id": uuid.uuid4(),
"_source": doc}
if __name__ == '__main__':
logging.basicConfig(level=logging.ERROR)
es=connect_elasticsearch()
index_name = "spot"
create_index(es, index_name='spot')
start =time.time()
path_to_json_output = "./sample_results.txt"
response= helpers.bulk(es, bulk_json_data(path_to_json_output, index_name, "people"))
end =time.time()
print("succeccfully send...")
print("time taken to send files .....",end-start)
print(response)