-
Notifications
You must be signed in to change notification settings - Fork 6
/
app.py
185 lines (157 loc) · 5.76 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import functools, logging, logging.config, redis, gzip, time, json
from flask.wrappers import Response
from lxml import etree
from flask import Flask, request, jsonify, render_template, send_from_directory
from flask_cors import CORS, cross_origin
from index_datas.simple_search import simple_match_search
from elasticsearch import Elasticsearch
from summary_1.summary import body_summary
from knn_indexing.index import knn_query
from modules.RateLimiter.src.request_handler import RateLimiter
from urllib3.exceptions import SSLError
from collections import OrderedDict
from typing import Callable
from fast_autocomplete import AutoComplete
AUTOCOMPLETE_MAX_COST = 3
AUTOCOMPLETE_SIZE = 5
logging.config.fileConfig(fname="flask_logging.conf", disable_existing_loggers=False)
logger = logging.getLogger(__name__)
CF_KEY = "news"
INDEX_NAME = "knn_index"
ES = Elasticsearch("admin:admin@localhost:9200", verify_certs=False, ssl_show_warn=False)
app = Flask(__name__)
cors = CORS(app)
app.config["CORS_HEADERS"] = "Content-Type"
def getAutocompleteEntries() -> dict:
words = {}
logger.info("Starting autocomplete words intialisation...")
with gzip.open('wikidump/enwiki-20210820-abstract.xml.gz', 'rb') as f:
doc_id = 1
for _, element in etree.iterparse(f, events=('end',), tag='doc'):
if (doc_id % 78 == 1):
title = element.findtext('./title')
logger.debug("Creating autocomplete word entry for [id={0}] [title={1}]".format(doc_id, title))
words[title[11:]] = {}
doc_id += 1
element.clear()
logger.info("Finished autocomplete words intialisation")
return words
autocomplete = AutoComplete(words=getAutocompleteEntries())
class ErrorHandlerWrapper:
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
self.client_addr = request.remote_addr
try:
with self:
return func(*args, **kwargs)
except SSLError as e:
logger.error("An issue with SSL occured", e)
response = {
"status_code": e.status_code,
"status": "An issue with SSL occured: {}".format(e)
}
return jsonify(response), e.status_code
except Exception as e:
logger.error("An internal error occurred", e)
response = {
"status_code": 500,
"status": "An internal error occurred: {}".format(e)
}
return jsonify(response), 500
return wrapper
def __enter__(self):
return
def __exit__(self, _a, _b, _c):
return
class TimedResponseWrapper:
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time_ns()
response: Response = func(*args, **kwargs)
result = json.loads(response.get_data())
result["duration"] = time.time_ns() - start_time
return jsonify(result)
return wrapper
def __enter__(self):
return
def __exit__(self, _a, _b, _c):
return
rd = redis.Redis()
try:
rd.execute_command("cf.init", CF_KEY, "64k")
except:
logger.warning("the filter is already initialized")
def fingerprint(x):
return ord(x[0])
@app.route("/")
@ErrorHandlerWrapper()
def index_page():
return render_template("index.html")
@app.route("/css/<path:filename>")
@ErrorHandlerWrapper()
def send_css(filename):
return send_from_directory("static/css", filename)
@app.route("/js/<path:filename>")
@ErrorHandlerWrapper()
def send_js(filename):
return send_from_directory("static/js", filename)
def base_search(query: str, search_method: Callable[[Elasticsearch, str, str], None]):
if not query:
return jsonify([])
in_result = rd.execute_command("cf.check", CF_KEY, hash(query), fingerprint(query))
if int(in_result) == 1:
orderedLoad = lambda x: json.loads(x, object_pairs_hook=OrderedDict)
result_value = list(map(orderedLoad, rd.execute_command("LRANGE", query, 0, -1)))
return jsonify(result={
"result": result_value,
"from": "Redis"
})
res = search_method(ES, "news", query)
list_res = res["hits"]["hits"]
for one in list_res:
sum_txt = body_summary(one["_source"]["art"])
one["_source"]["summary"] = " ".join(sum_txt)
rd.execute_command("cf.add", CF_KEY, hash(query), fingerprint(query))
for elem in list_res:
rd.execute_command("RPUSH", query, json.dumps(elem))
return jsonify(result={
"result": list_res,
"from": "Elasticsearch"
})
@app.route("/origin_search")
@cross_origin()
@ErrorHandlerWrapper()
@RateLimiter()
@TimedResponseWrapper()
def search():
query = request.args.get("query", None, type=str)
return base_search(query, lambda es,en,q: simple_match_search(es,en,q))
@app.route("/search")
@cross_origin()
@ErrorHandlerWrapper()
@RateLimiter()
@TimedResponseWrapper()
def knn_search():
query = request.args.get("query", None, type=str)
return base_search(query, lambda _es,_en,q: knn_query(q))
#Suggestion route invoked by a POST request returning relevant JSON entries for suggestion values
@cross_origin()
@ErrorHandlerWrapper()
@app.route("/suggest")
def suggest():
postdata = request.args.get("input", None, type=str)
suggest = autocomplete.search(
word=postdata,
max_cost=AUTOCOMPLETE_MAX_COST,
size=AUTOCOMPLETE_SIZE
)
result = {}
for x in range(len(suggest)):
result["suggest{0}".format(x)] = suggest[x]
return jsonify(result)
def main():
app.run(debug=True)
if __name__ == "__main__":
main()