From 2cdbeb87c82c059fb8a3edfc013629cd18745ae7 Mon Sep 17 00:00:00 2001 From: Julio Perez <37191411+jperez999@users.noreply.github.com> Date: Tue, 14 Jan 2025 13:31:53 -0500 Subject: [PATCH] =?UTF-8?q?fixes=20to=20handle=20upgrades=20in=20milvus=20?= =?UTF-8?q?packages=20and=20fix=20for=20gpu-to-cpu=20in=E2=80=A6=20(#325)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/src/nv_ingest_client/util/milvus.py | 79 +++++++++++++++------- 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/client/src/nv_ingest_client/util/milvus.py b/client/src/nv_ingest_client/util/milvus.py index 9e6e3230..744bef63 100644 --- a/client/src/nv_ingest_client/util/milvus.py +++ b/client/src/nv_ingest_client/util/milvus.py @@ -54,7 +54,9 @@ def create_nvingest_schema(dense_dim: int = 1024, sparse: bool = False) -> Colle return schema -def create_nvingest_index_params(sparse: bool = False, gpu_index: bool = True, gpu_search: bool = False) -> IndexParams: +def create_nvingest_index_params( + sparse: bool = False, gpu_index: bool = True, gpu_search: bool = False, local_index: bool = True +) -> IndexParams: """ Creates index params necessary to create an index for a collection. At a minimum, this function will create a dense embedding index but can also create a sparse @@ -78,27 +80,35 @@ def create_nvingest_index_params(sparse: bool = False, gpu_index: bool = True, g embedding index. """ index_params = MilvusClient.prepare_index_params() - if gpu_index: + if local_index: index_params.add_index( field_name="vector", index_name="dense_index", - index_type="GPU_CAGRA", + index_type="FLAT", metric_type="L2", - params={ - "intermediate_graph_degree": 128, - "graph_degree": 64, - "build_algo": "NN_DESCENT", - "adapt_for_cpu": "false" if gpu_search else "true", - }, ) else: - index_params.add_index( - field_name="vector", - index_name="dense_index", - index_type="HNSW", - metric_type="L2", - params={"M": 64, "efConstruction": 512}, - ) + if gpu_index: + index_params.add_index( + field_name="vector", + index_name="dense_index", + index_type="GPU_CAGRA", + metric_type="L2", + params={ + "intermediate_graph_degree": 128, + "graph_degree": 64, + "build_algo": "NN_DESCENT", + "adapt_for_cpu": "false" if gpu_search else "true", + }, + ) + else: + index_params.add_index( + field_name="vector", + index_name="dense_index", + index_type="HNSW", + metric_type="L2", + params={"M": 64, "efConstruction": 512}, + ) if sparse: index_params.add_index( field_name="sparse", @@ -178,6 +188,7 @@ def create_nvingest_collection( Returns a milvus collection schema, that represents the fields in the created collection. """ + local_index = False if urlparse(milvus_uri).scheme: connections.connect(uri=milvus_uri) server_version = utility.get_server_version() @@ -185,9 +196,14 @@ def create_nvingest_collection( gpu_index = False else: gpu_index = False + if milvus_uri.endswith(".db"): + local_index = True + client = MilvusClient(milvus_uri) schema = create_nvingest_schema(dense_dim=dense_dim, sparse=sparse) - index_params = create_nvingest_index_params(sparse=sparse, gpu_index=gpu_index, gpu_search=gpu_search) + index_params = create_nvingest_index_params( + sparse=sparse, gpu_index=gpu_index, gpu_search=gpu_search, local_index=local_index + ) create_collection(client, collection_name, schema, index_params, recreate=recreate) @@ -535,7 +551,6 @@ def dense_retrieval( collection_name=collection_name, data=dense_embeddings, anns_field=dense_field, - param={"metric_type": "L2"}, limit=top_k, output_fields=output_fields, ) @@ -552,6 +567,8 @@ def hybrid_retrieval( dense_field: str = "vector", sparse_field: str = "sparse", output_fields: List[str] = ["text"], + gpu_search: bool = False, + local_index: bool = False, ): """ This function takes the input queries and conducts a hybrid @@ -591,15 +608,20 @@ def hybrid_retrieval( dense_embeddings.append(dense_model.get_query_embedding(query)) sparse_embeddings.append(_format_sparse_embedding(sparse_model.encode_queries([query]))) + s_param_1 = { + "metric_type": "L2", + } + if not gpu_search and not local_index: + s_param_1["params"] = {"ef": top_k * 2} + # Create search requests for both vector types search_param_1 = { "data": dense_embeddings, "anns_field": dense_field, - "param": { - "metric_type": "L2", - }, + "param": s_param_1, "limit": top_k, } + dense_req = AnnSearchRequest(**search_param_1) search_param_2 = { @@ -628,6 +650,7 @@ def nvingest_retrieval( sparse_model_filepath: str = "bm25_model.json", model_name: str = "nvidia/nv-embedqa-e5-v5", output_fields: List[str] = ["text", "source", "content_metadata"], + gpu_search: bool = False, ): """ This function takes the input queries and conducts a hybrid/dense @@ -665,14 +688,24 @@ def nvingest_retrieval( List Nested list of top_k results per query. """ + local_index = False embed_model = NVIDIAEmbedding(base_url=embedding_endpoint, model=model_name) client = MilvusClient(milvus_uri) - + if milvus_uri.endswith(".db"): + local_index = True if hybrid: bm25_ef = BM25EmbeddingFunction(build_default_analyzer(language="en")) bm25_ef.load(sparse_model_filepath) results = hybrid_retrieval( - queries, collection_name, client, embed_model, bm25_ef, top_k, output_fields=output_fields + queries, + collection_name, + client, + embed_model, + bm25_ef, + top_k, + output_fields=output_fields, + gpu_search=gpu_search, + local_index=local_index, ) else: results = dense_retrieval(queries, collection_name, client, embed_model, top_k, output_fields=output_fields)