-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.py
93 lines (71 loc) · 1.96 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import sys
import argparse
import os
import yaml
import json
from psycopg2 import connect
from psycopg2.extras import RealDictCursor
from elasticsearch import Elasticsearch, helpers
from datafile import count_from_db, data_from_db, data_to_es
def init_es_index(index_name):
with open("{}.json".format(index_name)) as f:
config = json.load(f)
es.indices.delete(
index=index_name,
ignore_unavailable=True
)
es.indices.create(
index=index_name,
body=config
)
parser = argparse.ArgumentParser()
parser.add_argument(
"--config",
default="settings.yaml",
help="Config file location."
)
parser.add_argument(
"--rebuild",
action="store_true",
help="Delete and create index."
)
args = parser.parse_args()
if os.path.isfile(args.config):
with open(args.config) as f:
settings = yaml.load(f, Loader=yaml.Loader)
else:
sys.exit("Can't find settings.")
try:
con = connect(
host=settings["database"]["host"],
port=settings["database"]["port"],
user=settings["database"]["username"],
password=settings["database"]["password"],
database=settings["database"]["database"]
)
except Exception:
sys.exit("Can't connect to the database.")
try:
es_host = "{}:{}".format(
settings["elasticsearch"]["host"],
settings["elasticsearch"]["port"]
)
es = Elasticsearch([es_host])
except Exception:
con.close()
sys.exit("Can't connect to the Elasticsearch.")
cur = con.cursor(cursor_factory=RealDictCursor)
if args.rebuild:
print("Rebuild index.")
init_es_index(settings["index"]["name"])
start = 0
to_go = 1
while to_go > 0:
to_go = count_from_db(cur, start)
print("{:,} datafiles to index".format(to_go))
if to_go > 0:
(data, start) = data_from_db(cur, start, settings["index"]["limit"])
helpers.bulk(es, data_to_es(settings["index"]["name"], data))
print("Completed.")
cur.close()
con.close()