This repository has been archived by the owner on May 22, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathto_es.py
77 lines (61 loc) · 2.2 KB
/
to_es.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import faulthandler; faulthandler.enable()
import time
from tqdm import tqdm
import time
import json
from os import environ
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search
ES_INDEX_FULL_TEXT = "nycdocs-use"
FIRST = False
ES_INDEX_CHUNK = "nycdocs-use-chunk128"
vector_dims = 512
batch_size = 512
total_chunks = 37281 # get this with `wc nyc_docs_paragraphs.json`
total_docs = 4251
## Put ElasticSearch credentials here
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
if not es.ping():
raise ValueError("Connection to ElasticSearch failed")
sys.exit(1)
else:
print('Connection to ElasticSearch OK')
doc_counter = 0
idx_name_chunk = {}
name_idx_chunk = {}
def es_index_batch_chunk(lbatch):
global doc_counter
records = []
for body in lbatch.to_dict(orient='records'):
id_ = body["_id"] + "c" + str(body["chonk"])
idx_name_chunk[doc_counter] = id_
name_idx_chunk[id_] = doc_counter
body["page"] = doc_counter
body["_index"] = ES_INDEX_CHUNK
del body["smallenough"]
body["doc_id"] = body["_id"]
body["_id"] = id_
records.append(body)
doc_counter += 1
res = helpers.bulk(es, records, chunk_size=len(records), request_timeout=200)
import pandas as pd
import numpy as np
with tqdm(total=total_chunks) as pbar:
for j, batch in enumerate(pd.read_json('nyc_docs-sentences15.json', lines=True, chunksize=batch_size)):
batch["smallenough"] = batch["text"].apply(lambda x: len(x) < 100000)
batch = batch[batch["smallenough"]]
es_index_batch_chunk(batch)
pbar.update(len(batch))
with open(ES_INDEX_CHUNK + "_idx_name.json", 'w') as f:
f.write(json.dumps(idx_name_chunk))
with open(ES_INDEX_CHUNK + "_name_idx.json", 'w') as f:
f.write(json.dumps(name_idx_chunk))
# also put the full documents into ES
with open('nyc_docs.jsonl', 'r') as reader:
for i, line_json in tqdm(enumerate(reader), total=total_docs):
line = json.loads(line_json)
body = {
"text": line["_source"]["content"][:1000000],
"routing": line.get("_routing", None),
}
es.index(index=ES_INDEX_FULL_TEXT, id=line["_id"], body=body)