Skip to content

Commit

Permalink
Adds full OpenSearch engine support (#200)
Browse files Browse the repository at this point in the history
Use OpenSearch with all the notebooks by running:
docker compose up opensearch

* - 4.7 - listing organization
- skg.generate_request => skg.transform_request
- Fixes Ch6 bugs
- 15.18 listing organization

* - Applies listing print standards to all chapters
- Ch3. Updates print statements, organizes variable positioning
- rerank_quantity => rerank_count
- What happened to 10's feature calculater?
- Finish rerank_query refactor
- Chapter 11: Updates formatting of displaying data
- SKG: {!${defType} ...}

* SKG debug fix

* Rework ch3 tf-idf calculations; sync with manuscript

* Ch4, 5, 6: Listings and results synchronized

* Ch7 and Ch8: Listings synced

* Ch9 mostly consistent

* Syncs listing 8.1

* Ch10, Ch11 sync'd with manuscript and signed off

* Ch8 sync, ch10 n => k

* ch9: Correct, Deterministic, sync'd with manuscript, signing off

* - ch12, 13, 14, and 15 finalized and sync with Manuscript

Ch12 still has minor numeric variance.
Ch14 still uses pre-computed model

* quantization WIP

* Intermediate Ch13 Quantization Faiss impl

* Further improvementsQuantization code. Still needs cleanup and Listing 13.26 implemented

* Another checkpoint. 3/5 working (PQ and engine left). Code 50% cleaned

* Temp state. Just for reference. Mid refactor

* Listing 13.21 int8 scalar quantization done

* Another checkpoint

* Adds stubbed rerank recall method

* Small refactors as I'm working on manuscript listings

* Another checkpoint.

* Fix index writing error w/ PQ, increase M to get PQ recall up

* Checkpoint on evaluate rerank

* Checkpoint, Only one bug in calculate_recall(). Picking up engine search in the morning

* Cleanup quantization code (minus last engine example) + rename notebook

* Include SolrEngine

* Reorganize to match manuscript listing flow

* Refactor and cleanup quantization code for listings

* Quantization

* Revert back from quantization_type to quantization_size

Reducing scope. Only scalar and binary will be supported, and both of them work fine with "size".

* Finalizes quantization listings

* Cleanup to sync with manuscript

* - All chapters verified, functional and consistent (See lingering issues)
- Sets Jupyter to use environment home directory instead of notebook director for execution
- Product Display: Updates templates\search-results.html product search result html rendering to have better spacing and image sizes
- Adds missing product images that emerged from removing "\N"
- SolrCollection Vector Search: De-normalize returned score to be accurate with Cos/Dot similarity
--------------------------------------------
- Ch5: Paramiterizes print_graph() to de-dup
- Ch5: Remove old sections at end of SKG notebook
- Ch6: Reduces listing complexity by extracting noisy print logic
- Ch6: Tidys outputs and corrects organization of listings across cha
- Ch6: Adds missing Listing 6.4 to notebook
- Ch6: Refactors various spell check functions for specificity
- Ch7: Further refactors SS functions
- Ch7: Organizes and orders listings (and links/refs) from each notebook to be consistent with manuscript
- Ch7: Fixes semantic function bug and get_enrichment NRE bug
- Ch7: Adds Listing 7.14 Splade
- Ch8: Minor formatting, typo correction and cell order
- Ch9: Minor formatting, search result limit=10, and small refactorings
- Ch10: Correctly uses LTR.enable_ltr (not engine.enable_ltr)
- Ch11: Cell organization/labelling
- Ch12: Locks numpy to 1.23.5 and adds Numpy seed for consistent session synthesis
- Ch13: Cell organization, consistent result ranking, formatting, Fix 13.12 bug,
- Ch13: De-normalize vector scores from engine vector search
- Ch13: Tidy's cross-encoder code
- Ch14: Minor formatting, path corrections
------------------------------------------
Lingering issues:
- Image path: Undo hack when image is rendered from HTML (not using Jupyter Home path, using Chapter path), Fix image renderings in Ch11
- Ch7: Still needs fix for Iframe clearing 5 seconds after being rendered, verify semantic-search endpoint
- Ch12: Still 2 squirelly results. 99% consistent otherwise
- Ch13: 13.3 Amendum failing (Has been for some time?)

* Finalizes cross encoder code

* Basic OS implementation

* OpenSearch Spell checking

* Vector search, schema improvements, spark

* UBI: UBI Plugin added to image, docker organization, opensearch configs, basic

* Generating Signals collection from UBI

* Unsaved

* Fixes signals boosting

* This is a checkpoint

* Make opensearch-docker-entrypoint.sh executable

* Fixes build.

* Fixes build and correctly integrates with ubi 1.0.0 for OS 2.14

* Docker file updates

* check

* check again

* check again

* check again again

* Fixes keystroke error that broke 9.18

* Check

* check

* Updates, organizes and cleans dep stack. Updates applied throughout the stack

* OpenSearch integration:
- Abstract all references to "solr"
- Fixes filter issue (with *)
- Fixes outdoor schema
- Partial cleaning imports
- implements engine.name (never used in codebase, but mentioned in literature as an available property)
---
Pending known issues:
supplement query to account for edismax logic

* Intermediate checkpoint. Still fixing Ch9 and cleaning

* - Adds optional engine override param
- Intermediate locking of Solr dependent chapters to Solr
- Fixes various bugs regarding opensearch integrations

* Ch8 completely functional except index time boosting query side

* Chapter 8 finished for Opensearch except with Spark Bulk Indexing failures in 8.8

* UBI Checkpoint: 3 pending issues listed in notebook

* Corrects two queries missing un-aliased variable references

* Corrects 8.8

* Fix index loader so 8.8 runs + improve 8.8 variables/revert line order

* Add back new line near end of listing 8.8

* OS LTR all features except model idempotency and explore candidates. Needs cleaning

* All LTR working except fuzzy/bigrams

* OpenSearch LTR complete

* Corrects ch 13, 14, 15. improves 8.8

* - Implements Opensearch Sparse Semantic Search completely semantic functions
- Finalizes SemanticSearch abstractions and loading
- Opensearch code cleaned

* All chapter's verified

* Corrects vector size for outdoors embedding collection

* - Removes all usages and importing of set_engine
- Removes many other unused imports
- Adds collection.get_engine_name() which is used in several multi-methods
- Identified spark view from collection creation not using "_id" for id's. Issue does not currently effect codebase
- Adds versions to 4 un-versioned dependencies
- removes set_engine example from welcome (and fixes bad english)
- Fixes health check messages
- implements to_queries for semantic search query generation which got lost somewhere along the way

Chapter fixes:
- Ch4 Cleans up logs
- Ch5 Removes hard coding of engine and adds dual-indexing
- Ch6 Removes engine hardcoding, reruns notebooks with default engine
- Ch7 Reruns notebooks with default engine
- Ch8 removes hard coding
- Ch9 Verifies Opensearch functionality and reruns notebooks with default engine
- Ch10 Reruns with default engine, removes extra logging
- Ch12 Reruns with default engine, removes extra logging
- Ch13 Verifies Opensearch functionality, reruns with default search engine
- Ch14 re-verified chapter with default search engine

* - Reruns 6.2 to have no logging output
- Fixes id being returned from spark for opensearch
- dual indexing for ch7

* Add link to engines/README.md

---------

Co-authored-by: Daniel Crouch <dcrouch26@users.noreply.github.com>
Co-authored-by: Trey Grainger <code@treygrainger.com>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent bd98a45 commit dd6233a
Show file tree
Hide file tree
Showing 69 changed files with 22,856 additions and 3,314 deletions.
62 changes: 34 additions & 28 deletions aips/__init__.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,57 @@
import aips.environment as environment
from aips.environment import SOLR_URL
from engines.solr import SolrLTR, SolrSemanticKnowledgeGraph, SolrEntityExtractor, SolrSparseSemanticSearch
from engines.solr.SolrEngine import SolrEngine
from engines.solr.SolrCollection import SolrCollection

from engines.opensearch.OpenSearchCollection import OpenSearchCollection
from engines.opensearch.OpenSearchEngine import OpenSearchEngine
from engines.opensearch.OpenSearchLTR import OpenSearchLTR
from engines.opensearch.OpenSearchSparseSemanticSearch import OpenSearchSparseSemanticSearch

import os
from IPython.display import display,HTML
from IPython.display import display, HTML
import pandas
import re
import requests

engine_type_map = {"SOLR": SolrEngine()}
engine_type_map = {"SOLR": SolrEngine(),
"OPENSEARCH": OpenSearchEngine()}

def get_engine():
return engine_type_map[environment.get("AIPS_SEARCH_ENGINE", "SOLR")]
def get_engine(override=None):
engine_name = override.upper() if override else environment.get("AIPS_SEARCH_ENGINE", "SOLR")
return engine_type_map[engine_name]

def set_engine(engine_name):
def set_engine(engine_name):
engine_name = engine_name.upper()
if engine_name not in engine_type_map:
raise ValueError(f"No search engine implementation found for {engine_name}")
else:
environment.set("AIPS_SEARCH_ENGINE", engine_name)

def get_ltr_engine(collection):
return SolrLTR(collection)
def get_ltr_engine(collection):
ltr_engine_map = {SolrCollection: SolrLTR,
OpenSearchCollection: OpenSearchLTR}
return ltr_engine_map[type(collection)](collection)

def get_semantic_knowledge_graph(collection):
return SolrSemanticKnowledgeGraph(collection)
return SolrSemanticKnowledgeGraph(get_engine("solr").get_collection(collection.name))

def get_entity_extractor(collection):
return SolrEntityExtractor(collection)
return SolrEntityExtractor(get_engine("solr").get_collection(collection.name))

def get_sparse_semantic_search():
return SolrSparseSemanticSearch()
SSS_map = {SolrEngine: SolrSparseSemanticSearch,
OpenSearchEngine: OpenSearchSparseSemanticSearch}
return SSS_map[type(get_engine())]()

def healthcheck():
try:
if get_engine().health_check():
print("Solr is up and responding.")
print("Zookeeper is up and responding.\n")
print("All Systems are ready. Happy Searching!")
else:
print("The search engine is not in a ready state.")
except:
print("Error! One or more containers are not responding.\nPlease follow the instructions in Appendix A.")

def print_status(solr_response):
print("Status: Success" if solr_response["responseHeader"]["status"] == 0 else "Status: Failure; Response:[ " + str(solr_response) + " ]" )


def num2str(number):
return str(round(number,4)) #round to 4 decimal places for readibility

Expand All @@ -66,7 +73,7 @@ def images_directory():
relative = "../.."
return f"{relative}/data/retrotech/images"

def img_path_for_upc(product):
def img_path_for_product(product):
directory = images_directory()
file = product.get("upc", "no-upc")
if not os.path.exists(f"data/retrotech/images/{file}.jpg"):
Expand Down Expand Up @@ -106,7 +113,7 @@ def render_search_results(query, results):

rendered = header_template.replace("${QUERY}", query.replace('"', '\"'))
for result in results:
image_url = img_path_for_upc(result)
image_url = img_path_for_product(result)
rendered += results_template.replace("${NAME}", result.get("name", "UNKNOWN")) \
.replace("${MANUFACTURER}", result.get("manufacturer", "UNKNOWN")) \
.replace("${IMAGE_URL}", image_url)
Expand All @@ -115,15 +122,14 @@ def render_search_results(query, results):
return rendered

def fetch_products(doc_ids):
doc_ids = ["%s" % doc_id for doc_id in doc_ids]
query = "upc:( " + " OR ".join(doc_ids) + " )"
params = {'q': query, 'wt': 'json', 'rows': len(doc_ids)}
resp = requests.get(f"{SOLR_URL}/products/select", params=params)
df = pandas.DataFrame(resp.json()['response']['docs'])
request = {"query": " ".join([str(id) for id in doc_ids]),
"query_fields": ["upc"],
"limit": len(doc_ids)}
response = get_engine().get_collection("products").search(**request)

df = pandas.DataFrame(response["docs"])
df['upc'] = df['upc'].astype('int64')

df.insert(0, 'image', df.apply(lambda row: "<img height=\"100\" src=\"" + img_path_for_upc(row) + "\">", axis=1))

df.insert(0, 'image', df.apply(lambda row: "<img height=\"100\" src=\"" + img_path_for_product(row) + "\">", axis=1))
return df

def render_judged(products, judged, grade_col='ctr', label=""):
Expand Down
30 changes: 30 additions & 0 deletions aips/data_loaders/index_time_boosts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from pyspark.sql.functions import collect_list, create_map
from aips.spark.dataframe import from_sql
from aips.spark import create_view_from_collection

def load_dataframe(boosted_products_collection, boosts_collection):
assert(type(boosted_products_collection) == type(boosts_collection))
create_view_from_collection(boosts_collection,
boosts_collection.name)
create_view_from_collection(boosted_products_collection,
boosted_products_collection.name)
match boosted_products_collection.get_engine_name():
case "solr":
query = f"""SELECT p.*, b.signals_boosts FROM (
SELECT doc, CONCAT_WS(',', COLLECT_LIST(CONCAT(query, '|', boost)))
AS signals_boosts FROM {boosts_collection.name} GROUP BY doc
) b INNER JOIN {boosted_products_collection.name} p ON p.upc = b.doc"""
boosts_dataframe = from_sql(query)
case "opensearch":
product_query = f"SELECT * FROM {boosted_products_collection.name}"
boosts_query = f"SELECT doc, boost, REPLACE(query, '.', '') AS query FROM {boosts_collection.name}"

grouped_boosts = from_sql(boosts_query).groupBy("doc") \
.agg(collect_list(create_map("query", "boost"))[0].alias("signals_boost")) \
.withColumnRenamed("doc", "upc")

boosts_dataframe = from_sql(product_query).join(grouped_boosts, "upc")
case _:
raise Exception(f"Index time boost not implemented for {type(boosted_products_collection)}")

return boosts_dataframe
2 changes: 2 additions & 0 deletions aips/data_loaders/products.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf

def load_dataframe(csv_file):
print("Loading Products")
spark = SparkSession.builder.appName("AIPS").getOrCreate()
dataframe = spark.read.csv(csv_file, header=True, inferSchema=True)
dataframe = dataframe.withColumn("upc", udf(str)(col("upc")))
print("Schema: ")
dataframe.printSchema()
return dataframe
1 change: 1 addition & 0 deletions aips/data_loaders/reviews.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ def load_dataframe(csv_file):
spark = SparkSession.builder.appName("AIPS").getOrCreate()
dataframe = spark.read.csv(csv_file, inferSchema=True, header=True, multiLine=True, escape="\"") \
.select(col("id"), col("name_t").alias("business_name"),
col("name_s").alias("name"),
col("city_t").alias("city"),
col("state_t").alias("state"), col("text_t").alias("content"),
col("categories_t").alias("categories"), col("stars_i").alias("stars_rating"),
Expand Down
8 changes: 1 addition & 7 deletions aips/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@
AIPS_ZK_HOST = "aips-zk"
AIPS_ZK_PORT = os.getenv("AIPS_ZK_PORT") or "2181"

AIPS_SOLR_HOST = "aips-solr"
AIPS_SOLR_PORT = os.getenv("AIPS_SOLR_PORT") or "8983"
SOLR_URL = f"http://{AIPS_SOLR_HOST}:{AIPS_SOLR_PORT}/solr"
STATUS_URL = f"{SOLR_URL}/admin/zookeeper/status"
SOLR_COLLECTIONS_URL = f"{SOLR_URL}/admin/collections"

AIPS_WEBSERVER_HOST = os.getenv("AIPS_WEBSERVER_HOST") or "localhost"
AIPS_WEBSERVER_PORT = os.getenv("AIPS_WEBSERVER_PORT") or "2345"
WEBSERVER_URL = f"http://{AIPS_WEBSERVER_HOST}:{AIPS_WEBSERVER_PORT}"
Expand All @@ -27,7 +21,7 @@ def write_config(config):
json.dump(config, config_file)

def read_config():
with open(CONFIG_FILE_PATH, "r") as f:
with open(CONFIG_FILE_PATH, "r") as f:
return json.load(f)

def load_config():
Expand Down
26 changes: 26 additions & 0 deletions aips/indexers/product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from ltr.download import download, extract_tgz
from git import Repo # pip install gitpython


#Get datasets
![ ! -d 'retrotech' ] && git clone --depth 1 https://github.com/ai-powered-search/retrotech.git
! cd retrotech && git pull
! cd retrotech && mkdir -p '../data/retrotech/' && tar -xvf products.tgz -C '../data/retrotech/' && tar -xvf signals.tgz -C '../data/retrotech/'


dataset = ["https://github.com/ai-powered-search/tmdb/raw/main/judgments.tgz",
"https://github.com/ai-powered-search/tmdb/raw/main/movies.tgz"]
download(dataset, dest="")
extract_tgz("data/movies.tgz", "data/") # -> Holds "tmdb.json", big json dict with corpus
extract_tgz("data/judgments.tgz", "data/") # -> Holds "ai_pow_search_judgments.txt",
# which is our labeled judgment list
Repo.clone_from("https://github.com/ai-powered-search/retrotech.git", "data/retrotech/")

from aips.data_loaders.products import load_dataframe

products_collection = engine.create_collection("products")
products_dataframe = load_dataframe("data/retrotech/products.csv")
products_collection.write(products_dataframe)

signals_collection = engine.create_collection("signals")
signals_collection.write(from_csv("data/retrotech/signals.csv"))
24 changes: 22 additions & 2 deletions aips/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
from pyspark.sql import SparkSession

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

from aips.environment import AIPS_ZK_HOST
from engines.opensearch.config import OPENSEARCH_URL

def create_view_from_collection(collection, view_name, spark=None):
if not spark:
spark = SparkSession.builder.appName("AIPS").getOrCreate()
opts = {"zkhost": AIPS_ZK_HOST, "collection": collection.name}
spark.read.format("solr").options(**opts).load().createOrReplaceTempView(view_name)
match collection.get_engine_name():
case "solr":
opts = {"zkhost": AIPS_ZK_HOST, "collection": collection.name}
spark.read.format("solr").options(**opts).load().createOrReplaceTempView(view_name)
case "opensearch":
parse_id_udf = udf(lambda s: s["_id"], StringType())
opts = {"opensearch.nodes": OPENSEARCH_URL,
"opensearch.net.ssl": "false",
"opensearch.read.metadata": "true"}
dataframe = spark.read.format("opensearch").options(**opts).load(collection.name)
if "_metadata" in dataframe.columns:
dataframe = dataframe.withColumn("id", parse_id_udf(col("_metadata")))
dataframe = dataframe.drop("_metadata")
print(dataframe.columns)
dataframe.createOrReplaceTempView(view_name)
case _:
raise NotImplementedError(type(collection))
17 changes: 12 additions & 5 deletions build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,23 @@ RUN sudo apt-get -y update && \
apt-get install -y --reinstall build-essential gcc cargo && \
rm -rf /var/lib/apt/lists/

# Spark dependencies
# Install Spark-Solr
ENV SPARK_SOLR_VERSION=4.0.2
ENV SHADED_SOLR_JAR_PATH=/usr/local/spark/lib/spark-solr-${SPARK_SOLR_VERSION}-shaded.jar

# Install Spark-Solr
RUN mkdir -p /usr/local/spark/lib/ && cd /usr/local/spark/lib/ && \
wget -q https://repo1.maven.org/maven2/com/lucidworks/spark/spark-solr/${SPARK_SOLR_VERSION}/spark-solr-${SPARK_SOLR_VERSION}-shaded.jar -O $SHADED_SOLR_JAR_PATH && \
echo "c5293f10257603bcf650780afcb91ed1bb118f09feb731502c2dc7ac14ba950e586a033cb2f50e5c122c5ec442dc0d2b55f76c4f6522b555e67f4981a38bca26 *spark-solr-${SPARK_SOLR_VERSION}-shaded.jar" \
| sha512sum -c - && chmod a+rwx $SHADED_SOLR_JAR_PATH

# Install Spark-OpenSearch
ENV SPARK_OS_VERSION=1.2.0
ENV SPARK_OS_JAR=opensearch-spark-30_2.12-${SPARK_OS_VERSION}.jar
ENV SPARK_OS_PATH=/usr/local/spark/lib/${SPARK_OS_JAR}
RUN cd /usr/local/spark/lib/ && \
wget -q https://repo1.maven.org/maven2/org/opensearch/client/opensearch-spark-30_2.12/${SPARK_OS_VERSION}/${SPARK_OS_JAR} -O $SPARK_OS_PATH && \
echo "5b9ae056b6ac21ae009f79a3a761774c7178b995fbe035572a4f35d5738e055d02828d2ec0ff98dd063ffffe37f4c48dc9a418d71269fc560f65b33c94493f2d *${SPARK_OS_JAR}" \
| sha512sum -c - && chmod a+rwx $SPARK_OS_PATH

WORKDIR /home/$NB_USER

# Install Python dependencies
Expand Down Expand Up @@ -48,8 +55,8 @@ RUN fix-permissions /home/$NB_USER
USER $NB_USER

# Spark Config
ENV SPARK_OPTS="$SPARK_OPTS --driver-java-options=\"-DXlint:none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer -Dlog4j.logger.org.apache.spark.repl.Main=ERROR\" --spark.ui.showConsoleProgress=False --spark.driver.extraLibraryPath=$SHADED_SOLR_JAR_PATH --spark.executor.extraLibraryPath=$SHADED_SOLR_JAR_PATH" \
PYSPARK_SUBMIT_ARGS="-c spark.driver.defaultJavaOptions=\"-DXlint=none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer\" -c spark.ui.showConsoleProgress=False --jars $SHADED_SOLR_JAR_PATH pyspark-shell" \
ENV SPARK_OPTS="$SPARK_OPTS --driver-java-options=\"-DXlint:none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer -Dlog4j.logger.org.apache.spark.repl.Main=ERROR\" --spark.ui.showConsoleProgress=False" \
PYSPARK_SUBMIT_ARGS="-c spark.driver.defaultJavaOptions=\"-DXlint=none -Dlog4j.logLevel=error -Dallow-access=java.nio.DirectByteBuffer\" -c spark.ui.showConsoleProgress=False --jars $SHADED_SOLR_JAR_PATH,$SPARK_OS_PATH pyspark-shell" \
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-*-src.zip:%PYTHONPATH% \
DOCKER_STACKS_JUPYTER_CMD=lab

Expand Down
9 changes: 6 additions & 3 deletions build/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ thinc==8.1.0
numba==0.60.0
wrapit==0.3.0
h5py==3.12.1
gitpython==3.1.43

#Forced versions to prevent implicit dep version errors
# (these sub packages are only used indirectly through normal deps)
Expand All @@ -20,16 +21,18 @@ statsmodels==0.14.4
networkx==3.3

#Normal deps
accelerate==0.30.0
accelerate==0.34.2
beautifulsoup4==4.12.3
lxml
lxml==5.3.0
datasets==3.0.1
jupyter-console
jupyter-console==6.6.3
matplotlib==3.9.2
nltk==3.9.1
nmslib==2.1.1
plotly==5.24.1
plotnine==0.13.5
openai-clip==1.0.1
torchvision==0.20.1
numpy==1.26.4
scipy==1.14.1
#scikit-learn==1.5.2
Expand Down
Loading

0 comments on commit dd6233a

Please sign in to comment.