Skip to content

Commit

Permalink
Mongo cache (#34)
Browse files Browse the repository at this point in the history
* mongodb cache

* logging

* add mongo init at start of work

* option to clear cache, better caching

* cache more queries

* more invariant deciding when to cache

* cache if small result

* cache all multi token queries

* fix bug

* fix crash because python

* add json params to mongo clear to avoid crash

* remove debug log message

* set mongo timeout

* exception handling

* env

* fix old typo

* fix up server example

* fix log statement in wrong place
  • Loading branch information
rmillikin authored Apr 18, 2022
1 parent 6bab183 commit fa0b15a
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# change this to a directory on your local machine to store pubmed articles
PUBMED_DIR=/Users/rmillikin/PubmedAbstracts
PUBMED_DIR=/path/to/pubmed/folder

# password hash (password is 'password' by default; to change it, you need
# to generate a hash yourself using bcrypt and put it here)
Expand Down
8 changes: 7 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
image: fast_km-server:build
command: --pw ${PASSWORD_HASH} # edit .env file to change password
ports:
- "5001:5000"
- "5099:5000" # HOST_PORT:CONTAINER_PORT
depends_on:
- redis
networks:
Expand All @@ -26,6 +26,12 @@ services:
networks:
- fast_km-network

mongo:
image: mongo
command: --quiet --logpath /dev/null
networks:
- fast_km-network

redis:
image: redis
networks:
Expand Down
9 changes: 7 additions & 2 deletions examples/server_example/.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# change this to a directory on your local machine to store pubmed articles
#PUBMED_DIR=./pubmed
PUBMED_DIR=/path/to/my/PubmedAbstracts
PUBMED_DIR=/path/to/my/PubmedAbstracts

# password hash (password is 'password' by default; to change it, you need
# to generate a hash yourself using bcrypt and put it here)
# NOTE: I can't figure out how to use dollar signs in the hash. This is hacky,
# but replace $ with ____ (four underscores).
PASSWORD_HASH="____2b____12____YfgpDEOwxLy..UkZEe0H8.0aO/AQXpbsA4sAgZ9RWQShkG4iZYl16"
41 changes: 25 additions & 16 deletions examples/server_example/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
version: "3.3"
services:
server:
image: rmillikin/fast_km-server:dev
image: rmillikin/fast_km-server:latest
container_name: fast_km-server
command: --pw ${PASSWORD_HASH} # edit .env file to change this
ports:
- "5000:5000"
- "5099:5000" # HOST_PORT:CONTAINER_PORT
depends_on:
- redis
resources:
limits:
cpus: '3.0'
memory: '12gb'
networks:
- fast_km-network

workers:
image: rmillikin/fast_km-worker:dev
command: python -u src/run_worker.py --workers 1
image: rmillikin/fast_km-worker:latest
container_name: fast_km-worker
command: --workers 3
volumes:
- ${PUBMED_DIR}:/mnt/pubmed # edit .env file to change pubmed dir
- ${PUBMED_DIR}:/mnt/pubmed # edit .env file to change this
depends_on:
- redis
resources:
limits:
cpus: '10.0'
memory: '128gb'
networks:
- fast_km-network

mongo:
image: mongo
container_name: fast_km-mongo
command: --quiet --logpath /dev/null
networks:
- fast_km-network

redis:
image: redis
resources:
limits:
memory: '1gb'
container_name: fast_km-redis
networks:
- fast_km-network

networks:
fast_km-network:
33 changes: 3 additions & 30 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,42 +1,15 @@
aniso8601==9.0.1
arrow==1.2.0
attrs==21.2.0
certifi==2021.10.8
charset-normalizer==2.0.7
click==8.0.3
Flask==2.0.2
Flask-RESTful==0.3.9
idna==3.3
importlib-metadata==4.8.1
iniconfig==1.1.1
itsdangerous==2.0.1
Jinja2==3.0.2
joblib==1.1.0
MarkupSafe==2.0.1
nltk==3.6.5
numpy==1.21.2
packaging==21.0
pluggy==1.0.0
py==1.10.0
pygtrie==2.4.2
pyparsing==2.4.7
pytest==6.2.5
python-dateutil==2.8.2
pytz==2021.3
quickle==0.4.0
redis==3.5.3
regex==2021.10.8
requests==2.26.0
rq==1.10.0
rq-dashboard==0.6.1
scipy==1.7.1
shared-memory-dict==0.6.0
six==1.16.0
supervisor==4.2.2
toml==0.10.2
tqdm==4.62.3
typing-extensions==3.10.0.2
urllib3==1.26.7
Werkzeug==2.0.2
zipp==3.6.0
flask-bcrypt==0.7.1
flask-bcrypt==0.7.1
pymongo==4.0.1
Werkzeug==2.0.2
68 changes: 67 additions & 1 deletion src/indexing/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@
import math
import os
import gc
import time
import pymongo
from pymongo import errors
import indexing.km_util as util
from indexing.abstract_catalog import AbstractCatalog

delim = '\t'
mongo_cache = None

class Index():
def __init__(self, pubmed_abstract_dir: str):
# caches
self._query_cache = dict()
self._token_cache = dict()
self._n_articles_by_pub_year = dict()
_connect_to_mongo()

self._pubmed_dir = pubmed_abstract_dir
self._bin_path = util.get_index_file(pubmed_abstract_dir)
Expand All @@ -33,6 +38,11 @@ def query_index(self, query: str) -> 'set[int]':

if query in self._query_cache:
return self._query_cache[query]
else:
result = _check_mongo_for_query(query)
if not isinstance(result, type(None)):
self._query_cache[query] = result
return result

tokens = util.get_tokens(query)

Expand All @@ -42,7 +52,12 @@ def query_index(self, query: str) -> 'set[int]':
return set()

result = self._query_disk(tokens)

if len(result) < 10000 or len(tokens) > 1:
_place_in_mongo(query, result)

self._query_cache[query] = result

return result

def censor_by_year(self, pmids: 'set[int]', censor_year: int) -> 'set[int]':
Expand Down Expand Up @@ -190,4 +205,55 @@ def _intersect_dict_keys(dicts: 'list[dict]'):
key_intersect.remove(key)
break

return key_intersect
return key_intersect

def _connect_to_mongo():
# TODO: set expiration time for cached items (72h, etc.?)
# mongo_cache.create_index('query', unique=True) #expireafterseconds=72 * 60 * 60,
global mongo_cache
try:
loc = 'mongo'
client = pymongo.MongoClient(loc, 27017, serverSelectionTimeoutMS = 500, connectTimeoutMS = 500)
db = client["query_cache_db"]
mongo_cache = db["query_cache"]
mongo_cache.create_index('query', unique=True)
except:
print('warning: could not find a MongoDB instance to use as a query cache')
mongo_cache = None

def _check_mongo_for_query(query: str):
if not isinstance(mongo_cache, type(None)):
try:
result = mongo_cache.find_one({'query': query})
except:
print('warning: non-fatal error in retrieving from mongo')
return None

if not isinstance(result, type(None)):
return set(result['result'])
else:
return None
else:
return None


def _place_in_mongo(query, result):
if not isinstance(mongo_cache, type(None)):
try:
mongo_cache.insert_one({'query': query, 'result': list(result)})
except errors.DuplicateKeyError:
# tried to insert and got a duplicate key error. probably just the result
# of a race condition (another worker added the query record).
# it's fine, just continue on.
pass
except errors.AutoReconnect:
# not sure what this error is. seems to throw occasionally. just ignore it.
print('warning: non-fatal AutoReconnect error in inserting to mongo')
pass
else:
pass

def _empty_mongo():
if not isinstance(mongo_cache, type(None)):
x = mongo_cache.delete_many({})
print('mongodb cache cleared, ' + str(x.deleted_count) + ' items were deleted')
3 changes: 3 additions & 0 deletions src/indexing/index_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import indexing.km_util as util
from indexing.abstract import Abstract
from indexing.abstract_catalog import AbstractCatalog
import indexing.index

delim = '\t'

Expand All @@ -19,6 +20,8 @@ def build_index(self, dump_rate = 300000, overwrite_old = True):
abstract_catalog.catalog.clear() # saves RAM

print('building index...')
indexing.index._empty_mongo()

# build the index
catalog_path = util.get_abstract_catalog(self.path_to_pubmed_abstracts)
cold_storage = dict()
Expand Down
6 changes: 5 additions & 1 deletion src/run_server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import time
import argparse
import server.app as app

parser = argparse.ArgumentParser()
parser.add_argument('-p', '--pw_hash', default='none')
args = parser.parse_args()

def main():
print('server waiting 10 sec for redis to set up...')
time.sleep(10)

import server.app as app
app.start_server(args.pw_hash)

if __name__ == '__main__':
Expand Down
11 changes: 8 additions & 3 deletions src/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from redis import Redis
from rq import Queue
from flask_restful import Api
from workers.work import km_work, skim_work, triple_miner_work, update_index_work
from workers.work import km_work, skim_work, triple_miner_work, update_index_work, clear_mongo_cache
import logging
from flask_bcrypt import Bcrypt

Expand Down Expand Up @@ -69,7 +69,7 @@ def _post_generic(work, request, job_timeout = 43200):

def _get_generic(request):
if not _authenticate(request):
return 'Invalid password. do request.post(..., auth=(\'username\', \'password\'))', 401
return 'Invalid password. do request.get(..., auth=(\'username\', \'password\'))', 401

id = request.args['id']
job = _q.fetch_job(id)
Expand Down Expand Up @@ -126,4 +126,9 @@ def _post_update_index_job():

@_app.route('/update_index/api/jobs/', methods=['GET'])
def _get_update_index_job():
return _get_generic(request)
return _get_generic(request)

## ******** Clear MongoDB Cache Post ********
@_app.route('/clear_cache/api/jobs/', methods=['POST'])
def _post_clear_cache_job():
return _post_generic(clear_mongo_cache, request)
20 changes: 15 additions & 5 deletions src/workers/work.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import math
import indexing.index
from rq import get_current_job, Queue
from rq.worker import Worker
from redis import Redis
Expand All @@ -12,10 +13,11 @@
_q = Queue(connection=_r)

def km_work(json: list):
indexing.index._connect_to_mongo()
return_val = []

if len(json) > 100000:
raise ValueError('Must be <=100000 queries')
if len(json) > 1000000000:
raise ValueError('Must be <=1000000000 queries')

for item in json:
a_term = item['a_term']
Expand Down Expand Up @@ -48,6 +50,7 @@ def km_work(json: list):
return return_val

def skim_work(json: dict):
indexing.index._connect_to_mongo()
return_val = []

a_terms = json['a_terms']
Expand Down Expand Up @@ -123,6 +126,7 @@ def skim_work(json: dict):
return return_val

def triple_miner_work(json: list):
indexing.index._connect_to_mongo()
km_set = []

for query in json:
Expand All @@ -142,6 +146,7 @@ def triple_miner_work(json: list):
return km_work(km_set)

def update_index_work(json: dict):
indexing.index._connect_to_mongo()
if 'n_files' in json:
n_files = json['n_files']
else:
Expand Down Expand Up @@ -171,19 +176,24 @@ def update_index_work(json: dict):
index_builder = IndexBuilder(li.pubmed_path)
index_builder.build_index(overwrite_old=False) # wait to remove old index

# restart the workers
# restart the workers (TODO: except this one)
_update_job_status('progress', 'restarting workers')
interrupted_jobs = restart_workers(requeue_interrupted_jobs=False)
interrupted_jobs = _restart_workers(requeue_interrupted_jobs=False)

# remove the old index
index_builder.overwrite_old_index()
clear_mongo_cache()

# re-queue interrupted jobs
_queue_jobs(interrupted_jobs)

_update_job_status('progress', 'finished')

def restart_workers(requeue_interrupted_jobs = True):
def clear_mongo_cache(json):
indexing.index._connect_to_mongo()
indexing.index._empty_mongo()

def _restart_workers(requeue_interrupted_jobs = True):
print('restarting workers...')
workers = Worker.all(_r)

Expand Down

0 comments on commit fa0b15a

Please sign in to comment.