Skip to content

Commit

Permalink
fixes to handle upgrades in milvus packages and fix for gpu-to-cpu in… (
Browse files Browse the repository at this point in the history
  • Loading branch information
jperez999 authored Jan 14, 2025
1 parent d17e4c3 commit 2cdbeb8
Showing 1 changed file with 56 additions and 23 deletions.
79 changes: 56 additions & 23 deletions client/src/nv_ingest_client/util/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def create_nvingest_schema(dense_dim: int = 1024, sparse: bool = False) -> Colle
return schema


def create_nvingest_index_params(sparse: bool = False, gpu_index: bool = True, gpu_search: bool = False) -> IndexParams:
def create_nvingest_index_params(
sparse: bool = False, gpu_index: bool = True, gpu_search: bool = False, local_index: bool = True
) -> IndexParams:
"""
Creates index params necessary to create an index for a collection. At a minimum,
this function will create a dense embedding index but can also create a sparse
Expand All @@ -78,27 +80,35 @@ def create_nvingest_index_params(sparse: bool = False, gpu_index: bool = True, g
embedding index.
"""
index_params = MilvusClient.prepare_index_params()
if gpu_index:
if local_index:
index_params.add_index(
field_name="vector",
index_name="dense_index",
index_type="GPU_CAGRA",
index_type="FLAT",
metric_type="L2",
params={
"intermediate_graph_degree": 128,
"graph_degree": 64,
"build_algo": "NN_DESCENT",
"adapt_for_cpu": "false" if gpu_search else "true",
},
)
else:
index_params.add_index(
field_name="vector",
index_name="dense_index",
index_type="HNSW",
metric_type="L2",
params={"M": 64, "efConstruction": 512},
)
if gpu_index:
index_params.add_index(
field_name="vector",
index_name="dense_index",
index_type="GPU_CAGRA",
metric_type="L2",
params={
"intermediate_graph_degree": 128,
"graph_degree": 64,
"build_algo": "NN_DESCENT",
"adapt_for_cpu": "false" if gpu_search else "true",
},
)
else:
index_params.add_index(
field_name="vector",
index_name="dense_index",
index_type="HNSW",
metric_type="L2",
params={"M": 64, "efConstruction": 512},
)
if sparse:
index_params.add_index(
field_name="sparse",
Expand Down Expand Up @@ -178,16 +188,22 @@ def create_nvingest_collection(
Returns a milvus collection schema, that represents the fields in the created
collection.
"""
local_index = False
if urlparse(milvus_uri).scheme:
connections.connect(uri=milvus_uri)
server_version = utility.get_server_version()
if "lite" in server_version:
gpu_index = False
else:
gpu_index = False
if milvus_uri.endswith(".db"):
local_index = True

client = MilvusClient(milvus_uri)
schema = create_nvingest_schema(dense_dim=dense_dim, sparse=sparse)
index_params = create_nvingest_index_params(sparse=sparse, gpu_index=gpu_index, gpu_search=gpu_search)
index_params = create_nvingest_index_params(
sparse=sparse, gpu_index=gpu_index, gpu_search=gpu_search, local_index=local_index
)
create_collection(client, collection_name, schema, index_params, recreate=recreate)


Expand Down Expand Up @@ -535,7 +551,6 @@ def dense_retrieval(
collection_name=collection_name,
data=dense_embeddings,
anns_field=dense_field,
param={"metric_type": "L2"},
limit=top_k,
output_fields=output_fields,
)
Expand All @@ -552,6 +567,8 @@ def hybrid_retrieval(
dense_field: str = "vector",
sparse_field: str = "sparse",
output_fields: List[str] = ["text"],
gpu_search: bool = False,
local_index: bool = False,
):
"""
This function takes the input queries and conducts a hybrid
Expand Down Expand Up @@ -591,15 +608,20 @@ def hybrid_retrieval(
dense_embeddings.append(dense_model.get_query_embedding(query))
sparse_embeddings.append(_format_sparse_embedding(sparse_model.encode_queries([query])))

s_param_1 = {
"metric_type": "L2",
}
if not gpu_search and not local_index:
s_param_1["params"] = {"ef": top_k * 2}

# Create search requests for both vector types
search_param_1 = {
"data": dense_embeddings,
"anns_field": dense_field,
"param": {
"metric_type": "L2",
},
"param": s_param_1,
"limit": top_k,
}

dense_req = AnnSearchRequest(**search_param_1)

search_param_2 = {
Expand Down Expand Up @@ -628,6 +650,7 @@ def nvingest_retrieval(
sparse_model_filepath: str = "bm25_model.json",
model_name: str = "nvidia/nv-embedqa-e5-v5",
output_fields: List[str] = ["text", "source", "content_metadata"],
gpu_search: bool = False,
):
"""
This function takes the input queries and conducts a hybrid/dense
Expand Down Expand Up @@ -665,14 +688,24 @@ def nvingest_retrieval(
List
Nested list of top_k results per query.
"""
local_index = False
embed_model = NVIDIAEmbedding(base_url=embedding_endpoint, model=model_name)
client = MilvusClient(milvus_uri)

if milvus_uri.endswith(".db"):
local_index = True
if hybrid:
bm25_ef = BM25EmbeddingFunction(build_default_analyzer(language="en"))
bm25_ef.load(sparse_model_filepath)
results = hybrid_retrieval(
queries, collection_name, client, embed_model, bm25_ef, top_k, output_fields=output_fields
queries,
collection_name,
client,
embed_model,
bm25_ef,
top_k,
output_fields=output_fields,
gpu_search=gpu_search,
local_index=local_index,
)
else:
results = dense_retrieval(queries, collection_name, client, embed_model, top_k, output_fields=output_fields)
Expand Down

0 comments on commit 2cdbeb8

Please sign in to comment.