-
Notifications
You must be signed in to change notification settings - Fork 0
/
rethinkes.py
94 lines (73 loc) · 2.88 KB
/
rethinkes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import sys
import argparse
import rethinkdb as r
import configparser
from time import sleep
# from retrying import retry
from elasticsearch import Elasticsearch
parser = argparse.ArgumentParser()
parser.add_argument('--config', help='Choose a config file')
parser.add_argument('--test', help='Test and exit', action="store_true")
args = parser.parse_args()
# @retry(wait_fixed=5000)
def make_elastic_client(hosts):
try:
es = Elasticsearch(hosts,
sniff_on_start=True,
sniff_on_connection_fail=True,
retry_on_timeout=True,
sniffer_timeout=60,
http_auth=('elastic', 'changeme'))
return es
except:
print('Error while connecting to elasticsearch')
def start_sync(keep_id, rdbhost, rdbport, database, tables, hosts, doctype, **kwargs):
create_index = kwargs.get('create_index', False)
wait_for_index = kwargs.get('wait_for_index', False)
# Connect first to elasticsearch
es = make_elastic_client(hosts)
# Connect to rethinkdb
r.connect(rdbhost, rdbport).repl()
for table in tables:
if wait_for_index:
while not es.indices.exists(index=table):
print('Waiting for index to be created...')
sleep(1)
if create_index == False:
if not es.indices.exists(index=table):
print("Index not exists and I cannot create it")
sys.exit(0)
else:
es.indices.create(index=table, ignore=400)
cursor = r.db(database).table(table).run()
for doc in cursor:
options = {}
if keep_id == True:
options = {
"id" : doc['id']
}
res = es.index(index=table, doc_type=doctype,
body=doc, **options)
print("INSERT: ", doc)
if args.config:
config = configparser.ConfigParser()
config.read(args.config)
keep_id = bool(int(config['global']['keep-id']))
looptime = int(config['global']['loop-time'])
rdbhost = config['rethinkdb']['host']
rdbport = config['rethinkdb']['port']
database = config['rethinkdb']['database']
tables = (config['rethinkdb']['tables']).split(',')
hosts = (config['elasticsearch']['hosts']).split(',')
doctype = config['elasticsearch']['doctype']
create_index = bool(int(config['elasticsearch']['create-index']))
wait_for_index = bool(int(config['elasticsearch']['wait-for-index']))
sync_args = (keep_id, rdbhost, rdbport,
database, tables, hosts, doctype)
if looptime > 0:
while True:
start_sync(*sync_args, create_index=create_index, wait_for_index=wait_for_index)
sleep(looptime)
start_sync(*sync_args, create_index=create_index)
if args.test:
sys.exit(0)