diff --git a/py/plugins/vertex-ai/pyproject.toml b/py/plugins/vertex-ai/pyproject.toml index 425bee532e..ff2dd3c283 100644 --- a/py/plugins/vertex-ai/pyproject.toml +++ b/py/plugins/vertex-ai/pyproject.toml @@ -18,10 +18,13 @@ classifiers = [ ] dependencies = [ "genkit", + "google-genai>=1.7.0", "google-cloud-aiplatform>=1.77.0", "pytest-mock", "structlog>=25.2.0", "strenum>=0.4.15; python_version < '3.11'", + "google-cloud-bigquery", + "google-cloud-firestore", ] description = "Genkit Google Cloud Vertex AI Plugin" license = { text = "Apache-2.0" } diff --git a/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/__init__.py b/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/__init__.py index c0ac5edf03..c635d21132 100644 --- a/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/__init__.py +++ b/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/__init__.py @@ -26,6 +26,7 @@ from genkit.plugins.vertex_ai.gemini import GeminiVersion from genkit.plugins.vertex_ai.imagen import ImagenOptions, ImagenVersion from genkit.plugins.vertex_ai.plugin_api import VertexAI, vertexai_name +from genkit.plugins.vertex_ai.vector_search.vector_search import VertexAIVectorSearch def package_name() -> str: @@ -46,4 +47,5 @@ def package_name() -> str: GeminiVersion.__name__, ImagenVersion.__name__, ImagenOptions.__name__, + VertexAIVectorSearch.__name__, ] diff --git a/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/models/retriever.py b/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/models/retriever.py new file mode 100644 index 0000000000..23e6644d95 --- /dev/null +++ b/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/models/retriever.py @@ -0,0 +1,345 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +import json +from abc import ABC, abstractmethod +from collections.abc import Callable +from typing import Any + +import structlog +from google.cloud import bigquery, firestore +from google.cloud.aiplatform_v1 import FindNeighborsRequest, IndexDatapoint, Neighbor +from pydantic import BaseModel, Field, ValidationError + +from genkit.ai import Genkit +from genkit.blocks.document import Document +from genkit.core.typing import Embedding +from genkit.types import ActionRunContext, RetrieverRequest, RetrieverResponse + +logger = structlog.get_logger(__name__) + +DEFAULT_LIMIT_NEIGHBORS: int = 3 + + +class DocRetriever(ABC): + """Abstract base class for Vertex AI Vector Search document retrieval. + + This class outlines the core workflow for retrieving relevant documents. + It is not intended to be instantiated directly. Subclasses must implement + the abstract methods to provide concrete retrieval logic depending of the + technology used. + + Attributes: + ai: The Genkit instance. + name: The name of this retriever instance. + match_service_client: The Vertex AI Matching Engine client. + embedder: The name of the embedder to use for generating embeddings. + embedder_options: Options to pass to the embedder. + """ + def __init__( + self, + ai: Genkit, + name: str, + match_service_client_generator: Callable, + embedder: str, + embedder_options: dict[str, Any] | None = None, + ) -> None: + """Initializes the DocRetriever. + + Args: + ai: The Genkit application instance. + name: The name of this retriever instance. + match_service_client_generator: The Vertex AI Matching Engine client. + embedder: The name of the embedder to use for generating embeddings. + Already added plugin prefix. + embedder_options: Optional dictionary of options to pass to the embedder. + """ + self.ai = ai + self.name = name + self.embedder = embedder + self.embedder_options = embedder_options or {} + self._match_service_client_generator = match_service_client_generator + + async def retrieve(self, request: RetrieverRequest, _: ActionRunContext) -> RetrieverResponse: + """Retrieves documents based on a given query. + + Args: + request: The retrieval request containing the query. + _: The ActionRunContext (unused in this method). + + Returns: + A RetrieverResponse object containing the retrieved documents. + """ + document = Document.from_document_data(document_data=request.query) + + embeddings = await self.ai.embed( + embedder=self.embedder, + documents=[document], + options=self.embedder_options, + ) + + limit_neighbors = DEFAULT_LIMIT_NEIGHBORS + if isinstance(request.options, dict) and request.options.get('limit') is not None: + limit_neighbors = request.options.get('limit') + + docs = await self._get_closest_documents( + request=request, + top_k=limit_neighbors, + query_embeddings=embeddings.embeddings[0], + ) + + return RetrieverResponse(documents=docs) + + async def _get_closest_documents( + self, request: RetrieverRequest, top_k: int, query_embeddings: Embedding + ) -> list[Document]: + """Retrieves the closest documents from the vector search index based on query embeddings. + + Args: + request: The retrieval request containing the query and metadata. + top_k: The number of nearest neighbors to retrieve. + query_embeddings: The embedding of the query. + + Returns: + A list of Document objects representing the closest documents. + + Raises: + AttributeError: If the request does not contain the necessary + index endpoint path in its metadata. + """ + metadata = request.query.metadata + if not metadata or 'index_endpoint_path' not in metadata or 'api_endpoint' not in metadata: + raise AttributeError('Request provides no data about index endpoint path') + + api_endpoint = metadata['api_endpoint'] + index_endpoint_path = metadata['index_endpoint_path'] + deployed_index_id = metadata['deployed_index_id'] + + client_options = { + "api_endpoint": api_endpoint + } + + vector_search_client = self._match_service_client_generator( + client_options=client_options, + ) + + nn_request = FindNeighborsRequest( + index_endpoint=index_endpoint_path, + deployed_index_id=deployed_index_id, + queries=[ + FindNeighborsRequest.Query( + datapoint=IndexDatapoint(feature_vector=query_embeddings.embedding), + neighbor_count=top_k, + ) + ], + ) + + response = await vector_search_client.find_neighbors(request=nn_request) + + return await self._retrieve_neighbours_data_from_db(neighbours=response.nearest_neighbors[0].neighbors) + + @abstractmethod + async def _retrieve_neighbours_data_from_db(self, neighbours: list[Neighbor]) -> list[Document]: + """Retrieves document data from the database based on neighbor information. + + This method must be implemented by subclasses to define how document + data is fetched from the database using the provided neighbor information. + + Args: + neighbours: A list of Neighbor objects representing the nearest neighbors + found in the vector search index. + + Returns: + A list of Document objects containing the data for the retrieved documents. + """ + raise NotImplementedError + + +class BigQueryRetriever(DocRetriever): + """Retrieves documents from a BigQuery table. + + This class extends DocRetriever to fetch document data from a specified BigQuery + dataset and table. It constructs a query to retrieve documents based on the IDs + obtained from nearest neighbor search results. + + Attributes: + bq_client: The BigQuery client to use for querying. + dataset_id: The ID of the BigQuery dataset. + table_id: The ID of the BigQuery table. + """ + def __init__( + self, bq_client: bigquery.Client, dataset_id: str, table_id: str, *args, **kwargs, + ) -> None: + """Initializes the BigQueryRetriever. + + Args: + bq_client: The BigQuery client to use for querying. + dataset_id: The ID of the BigQuery dataset. + table_id: The ID of the BigQuery table. + *args: Additional positional arguments to pass to the parent class. + **kwargs: Additional keyword arguments to pass to the parent class. + """ + super().__init__(*args, **kwargs) + self.bq_client = bq_client + self.dataset_id = dataset_id + self.table_id = table_id + + async def _retrieve_neighbours_data_from_db(self, neighbours: list[Neighbor]) -> list[Document]: + """Retrieves document data from the BigQuery table for the given neighbors. + + Constructs and executes a BigQuery query to fetch document data based on + the IDs obtained. Handles potential errors during query execution and + document parsing. + + Args: + neighbours: A list of Neighbor objects representing the nearest neighbors. + Each neighbor should contain a datapoint with a datapoint_id. + + Returns: + A list of Document objects containing the retrieved document data. + Returns an empty list if no IDs are found in the neighbors or if the + query fails. + """ + ids = [ + n.datapoint.datapoint_id + for n in neighbours + if n.datapoint and n.datapoint.datapoint_id + ] + + distance_by_id = { + n.datapoint.datapoint_id: n.distance + for n in neighbours + if n.datapoint and n.datapoint.datapoint_id + } + + if not ids: + return [] + + query = f""" + SELECT * FROM `{self.dataset_id}.{self.table_id}` + WHERE id IN UNNEST(@ids) + """ + + job_config = bigquery.QueryJobConfig( + query_parameters=[bigquery.ArrayQueryParameter('ids', 'STRING', ids)], + ) + + try: + query_job = self.bq_client.query(query, job_config=job_config) + rows = query_job.result() + except Exception as e: + await logger.aerror('Failed to execute BigQuery query: %s', e) + return [] + + documents: list[Document] = [] + + for row in rows: + try: + id = row['id'] + + content = row['content'] + content = json.dumps(content) if isinstance(content, dict) else str(content) + + metadata = row.get('metadata', {}) + metadata['id'] = id + metadata['distance'] = distance_by_id[id] + + documents.append(Document.from_text(content, metadata)) + except (ValidationError, json.JSONDecodeError, Exception) as error: + doc_id = row.get('id', '') + await logger.awarning(f'Failed to parse document data for document with ID {doc_id}: {error}') + + return documents + + +class FirestoreRetriever(DocRetriever): + """Retrieves documents from a Firestore collection. + + This class extends DocRetriever to fetch document data from a specified Firestore + collection. It retrieves documents based on IDs obtained from nearest neighbor + search results. + + Attributes: + db: The Firestore client. + collection_name: The name of the Firestore collection. + """ + def __init__( + self, firestore_client: firestore.AsyncClient, collection_name: str, *args, **kwargs, + ) -> None: + """Initializes the FirestoreRetriever. + + Args: + firestore_client: The Firestore client to use for querying. + collection_name: The name of the Firestore collection. + *args: Additional positional arguments to pass to the parent class. + **kwargs: Additional keyword arguments to pass to the parent class. + """ + super().__init__(*args, **kwargs) + self.db = firestore_client + self.collection_name = collection_name + + async def _retrieve_neighbours_data_from_db(self, neighbours: list[Neighbor]) -> list[Document]: + """Retrieves document data from the Firestore collection for the given neighbors. + + Fetches document data from Firestore based on the IDs of the nearest neighbors. + Handles potential errors during document retrieval and data parsing. + + Args: + neighbours: A list of Neighbor objects representing the nearest neighbors. + Each neighbor should contain a datapoint with a datapoint_id. + + Returns: + A list of Document objects containing the retrieved document data. + Returns an empty list if no documents are found for the given IDs. + """ + documents: list[Document] = [] + + for neighbor in neighbours: + doc_ref = self.db.collection(self.collection_name).document(document_id=neighbor.datapoint.datapoint_id) + doc_snapshot = doc_ref.get() + + if doc_snapshot.exists: + doc_data = doc_snapshot.to_dict() or {} + + content = doc_data.get('content') + content = json.dumps(content) if isinstance(content, dict) else str(content) + + metadata = doc_data.get('metadata', {}) + metadata['id'] = neighbor.datapoint.datapoint_id + metadata['distance'] = neighbor.distance + + try: + documents.append( + Document.from_text( + content, + metadata, + ) + ) + except ValidationError as e: + await logger.awarning( + f'Failed to parse document data for ID {neighbor.datapoint.datapoint_id}: {e}' + ) + + return documents + + +class RetrieverOptionsSchema(BaseModel): + """Schema for retriver options. + + Attributes: + limit: Number of documents to retrieve. + """ + limit: int | None = Field(title='Number of documents to retrieve', default=None) diff --git a/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/vector_search/vector_search.py b/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/vector_search/vector_search.py new file mode 100644 index 0000000000..7126ca65b4 --- /dev/null +++ b/py/plugins/vertex-ai/src/genkit/plugins/vertex_ai/vector_search/vector_search.py @@ -0,0 +1,104 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +from functools import partial +from typing import Any + +import structlog +from google.auth.credentials import Credentials +from google.cloud import aiplatform_v1 + +from genkit.ai import GenkitRegistry, Plugin +from genkit.plugins.vertex_ai import vertexai_name +from genkit.plugins.vertex_ai.models.retriever import ( + DocRetriever, + RetrieverOptionsSchema, +) + +logger = structlog.get_logger(__name__) + + +class VertexAIVectorSearch(Plugin): + """A plugin for integrating VertexAI Vector Search. + + This class registers VertexAI Vector Stores within a registry, + and allows interaction to retrieve similar documents. + """ + + name: str = 'vertexAIVectorSearch' + + def __init__( + self, + retriever: DocRetriever, + retriever_extra_args: dict[str, Any] | None = None, + credentials: Credentials | None = None, + project: str | None = None, + location: str | None = 'us-central1', + embedder: str | None = None, + embedder_options: dict[str, Any] | None = None, + ) -> None: + """Initializes the VertexAIVectorSearch plugin. + + Args: + retriever: The DocRetriever class to use for retrieving documents. + retriever_extra_args: Optional dictionary of extra arguments to pass to the + retriever's constructor. + credentials: Optional Google Cloud credentials to use. If not provided, + the default application credentials will be used. + project: Optional Google Cloud project ID. If not provided, it will be + inferred from the credentials. + location: Optional Google Cloud location (region). Defaults to + 'us-central1'. + embedder: Optional identifier for the embedding model to use. + embedder_options: Optional dictionary of options to pass to the embedding + model. + """ + self.project = project + self.location = location + + self.embedder = embedder + self.embedder_options = embedder_options + + self.retriever_cls = retriever + self.retriever_extra_args = retriever_extra_args or {} + + self._match_service_client_generator = partial( + aiplatform_v1.MatchServiceAsyncClient, + credentials=credentials, + ) + + def initialize(self, ai: GenkitRegistry) -> None: + """Initialize plugin with the retriver specified. + + Register actions with the registry making them available for use in the Genkit framework. + + Args: + ai: The registry to register actions with. + """ + retriever = self.retriever_cls( + ai=ai, + name=self.name, + match_service_client_generator=self._match_service_client_generator, + embedder=self.embedder, + embedder_options=self.embedder_options, + **self.retriever_extra_args, + ) + + return ai.define_retriever( + name=vertexai_name(self.name), + config_schema=RetrieverOptionsSchema, + fn=retriever.retrieve, + ) diff --git a/py/samples/vertex-ai-vector-search-bigquery/LICENSE b/py/samples/vertex-ai-vector-search-bigquery/LICENSE new file mode 100644 index 0000000000..2205396735 --- /dev/null +++ b/py/samples/vertex-ai-vector-search-bigquery/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2025 Google LLC + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/py/samples/vertex-ai-vector-search-bigquery/README.md b/py/samples/vertex-ai-vector-search-bigquery/README.md new file mode 100644 index 0000000000..c7a43ca694 --- /dev/null +++ b/py/samples/vertex-ai-vector-search-bigquery/README.md @@ -0,0 +1,32 @@ +# Vertex AI - Vector Search BigQuery + +An example demonstrating the use Vector Search API with BigQuery retriever for Vertex AI + +## Setup environment + +1. Install [GCP CLI](https://cloud.google.com/sdk/docs/install). +2. Run the following code to connect to VertexAI. +```bash +gcloud auth application-default login +``` +3. Set the following env vars to run the sample +``` +export LOCATION='' +export PROJECT_ID='' +export BIGQUERY_DATASET_NAME='' +export BIGQUERY_TABLE_NAME='' +export VECTOR_SEARCH_DEPLOYED_INDEX_ID='' +export VECTOR_SEARCH_INDEX_ENDPOINT_PATH='' +export VECTOR_SEARCH_API_ENDPOINT='' +``` +4. Run the sample. + +## Run the sample + +```bash +genkit start -- uv run src/sample.py +``` + +## Set up env for sample +In the file `setup_env.py` you will find some code that will help you to create the bigquery dataset, table with the expected schema, encode the content of the table and push this to the VertexAI Vector Search index. +This index must be created with update method set as `stream`. VertexAI Index is expected to be already created. diff --git a/py/samples/vertex-ai-vector-search-bigquery/pyproject.toml b/py/samples/vertex-ai-vector-search-bigquery/pyproject.toml new file mode 100644 index 0000000000..7eae7e480c --- /dev/null +++ b/py/samples/vertex-ai-vector-search-bigquery/pyproject.toml @@ -0,0 +1,39 @@ +[project] +authors = [{ name = "Google" }] +classifiers = [ + "Development Status :: 3 - Alpha", + "Environment :: Console", + "Environment :: Web Environment", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development :: Libraries", +] +dependencies = [ + "genkit", + "genkit-plugin-vertex-ai", + "pydantic>=2.10.5", + "structlog>=25.2.0", + "google-cloud-bigquery", + "strenum>=0.4.15; python_version < '3.11'", +] +description = "An example demonstrating the use Vector Search API with BigQuery retriever for Vertex AI" +license = { text = "Apache-2.0" } +name = "vertex-ai-vector-search-bigquery" +readme = "README.md" +requires-python = ">=3.10" +version = "0.1.0" + +[build-system] +build-backend = "hatchling.build" +requires = ["hatchling"] + +[tool.hatch.build.targets.wheel] +packages = ["src/sample"] diff --git a/py/samples/vertex-ai-vector-search-bigquery/src/sample.py b/py/samples/vertex-ai-vector-search-bigquery/src/sample.py new file mode 100644 index 0000000000..39c994a261 --- /dev/null +++ b/py/samples/vertex-ai-vector-search-bigquery/src/sample.py @@ -0,0 +1,139 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +import os +import time + +import structlog +from google.cloud import aiplatform, bigquery +from pydantic import BaseModel + +from genkit.ai import Genkit +from genkit.blocks.document import ( + Document, +) +from genkit.plugins.vertex_ai import ( + EmbeddingModels, + VertexAI, + VertexAIVectorSearch, + vertexai_name, +) +from genkit.plugins.vertex_ai.models.retriever import BigQueryRetriever + +LOCATION = os.getenv('LOCATION') +PROJECT_ID = os.getenv('PROJECT_ID') +EMBEDDING_MODEL = EmbeddingModels.TEXT_EMBEDDING_004_ENG + +BIGQUERY_DATASET_NAME = os.getenv('BIGQUERY_DATASET_NAME') +BIGQUERY_TABLE_NAME = os.getenv('BIGQUERY_TABLE_NAME') + +VECTOR_SEARCH_DEPLOYED_INDEX_ID = os.getenv('VECTOR_SEARCH_DEPLOYED_INDEX_ID') +VECTOR_SEARCH_INDEX_ENDPOINT_PATH = os.getenv('VECVECTOR_SEARCH_INDEX_ENDPOINT_PATHTOR_SEARCH_INDEX_ENDPOINT_ID') +VECTOR_SEARCH_API_ENDPOINT = os.getenv('VECTOR_SEARCH_API_ENDPOINT') + +bq_client = bigquery.Client(project=PROJECT_ID) +aiplatform.init(project=PROJECT_ID, location=LOCATION) + +logger = structlog.get_logger(__name__) + +ai = Genkit( + plugins=[ + VertexAI(), + VertexAIVectorSearch( + retriever=BigQueryRetriever, + retriever_extra_args={ + 'bq_client': bq_client, + 'dataset_id': BIGQUERY_DATASET_NAME, + 'table_id': BIGQUERY_TABLE_NAME, + }, + embedder=vertexai_name(EMBEDDING_MODEL), + embedder_options={ + 'task': 'RETRIEVAL_DOCUMENT', + 'output_dimensionality': 128, + }, + ), + ] +) + + +class QueryFlowInputSchema(BaseModel): + """Input schema.""" + query: str + k: int + + +class QueryFlowOutputSchema(BaseModel): + """Output schema.""" + result: list[dict] + length: int + time: int + + +@ai.flow(name='queryFlow') +async def query_flow(_input: QueryFlowInputSchema) -> QueryFlowOutputSchema: + """Executes a vector search with VertexAI Vector Search.""" + start_time = time.time() + + query_document = Document.from_text(text=_input.query) + query_document.metadata = { + 'api_endpoint': VECTOR_SEARCH_API_ENDPOINT, + 'index_endpoint_path': VECTOR_SEARCH_INDEX_ENDPOINT_PATH, + 'deployed_index_id': VECTOR_SEARCH_DEPLOYED_INDEX_ID, + } + + options = { + 'limit': 10, + } + + result: list[Document] = await ai.retrieve( + retriever=vertexai_name('vertexAIVectorSearch'), + query=query_document, + options=options, + ) + + end_time = time.time() + + duration = int(end_time - start_time) + + result_data = [] + for doc in result.documents: + result_data.append({ + 'id': doc.metadata.get('id'), + 'text': doc.content[0].root.text, + 'distance': doc.metadata.get('distance'), + }) + + result_data = sorted(result_data, key=lambda x: x['distance']) + + return QueryFlowOutputSchema( + result=result_data, + length=len(result_data), + time=duration, + ) + + +async def main() -> None: + """Main function.""" + query_input = QueryFlowInputSchema( + query="Content for doc", + k=3, + ) + + await logger.ainfo(await query_flow(query_input)) + + +if __name__ == '__main__': + ai.run_main(main()) diff --git a/py/samples/vertex-ai-vector-search-bigquery/src/setup_env.py b/py/samples/vertex-ai-vector-search-bigquery/src/setup_env.py new file mode 100644 index 0000000000..7fc6710c9b --- /dev/null +++ b/py/samples/vertex-ai-vector-search-bigquery/src/setup_env.py @@ -0,0 +1,261 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +import json +import os + +import structlog +from google.cloud import aiplatform, aiplatform_v1, bigquery + +from genkit import types +from genkit.ai import Genkit +from genkit.plugins.vertex_ai import ( + EmbeddingModels, + VertexAI, + VertexAIVectorSearch, + vertexai_name, +) +from genkit.plugins.vertex_ai.models.retriever import BigQueryRetriever + +# Environment Variables +LOCATION = os.getenv('LOCATION') +PROJECT_ID = os.getenv('PROJECT_ID') +EMBEDDING_MODEL = EmbeddingModels.TEXT_EMBEDDING_004_ENG + +BIGQUERY_DATASET_NAME = os.getenv('BIGQUERY_DATASET_NAME') +BIGQUERY_TABLE_NAME = os.getenv('BIGQUERY_TABLE_NAME') + +VECTOR_SEARCH_INDEX_ID = os.getenv('VECTOR_SEARCH_INDEX_ID') + +bq_client = bigquery.Client(project=PROJECT_ID) +aiplatform.init(project=PROJECT_ID, location=LOCATION) + +logger = structlog.get_logger(__name__) + +ai = Genkit( + plugins=[ + VertexAI(), + VertexAIVectorSearch( + retriever=BigQueryRetriever, + retriever_extra_args={ + 'bq_client': bq_client, + 'dataset_id': BIGQUERY_DATASET_NAME, + 'table_id': BIGQUERY_TABLE_NAME, + }, + embedder=EMBEDDING_MODEL, + embedder_options={'task': 'RETRIEVAL_DOCUMENT'}, + ) + ] +) + + +@ai.flow(name='generateEmbeddings') +async def generate_embeddings(): + """Generates document embeddings and upserts them to the Vertex AI Vector Search index. + + This flow retrieves data from BigQuery, generates embeddings for the documents, + and then upserts these embeddings to the specified Vector Search index. + """ + toy_documents = [ + { + "id": "doc1", + "content": {"title": "Document 1", "body": "This is the content of document 1."}, + "metadata": {"author": "Alice", "date": "2024-01-15"}, + }, + { + "id": "doc2", + "content": {"title": "Document 2", "body": "This is the content of document 2."}, + "metadata": {"author": "Bob", "date": "2024-02-20"}, + }, + { + "id": "doc3", + "content": {"title": "Document 3", "body": "Content for doc 3"}, + "metadata": {"author": "Charlie", "date": "2024-03-01"}, + }, + ] + + create_bigquery_dataset_and_table( + PROJECT_ID, LOCATION, BIGQUERY_DATASET_NAME, BIGQUERY_TABLE_NAME, toy_documents, + ) + + results_dict = get_data_from_bigquery( + bq_client=bq_client, + project_id=PROJECT_ID, + dataset_id=BIGQUERY_DATASET_NAME, + table_id=BIGQUERY_TABLE_NAME, + ) + + genkit_documents = [ + types.Document(content=[types.TextPart(text=text)]) + for text in results_dict.values() + ] + + embed_response = await ai.embed( + embedder=vertexai_name(EMBEDDING_MODEL), + documents=genkit_documents, + options={'task': 'RETRIEVAL_DOCUMENT', 'output_dimensionality': 128}, + ) + + embeddings = [emb.embedding for emb in embed_response.embeddings] + logger.debug(f'Generated {len(embeddings)} embeddings, dimension: {len(embeddings[0])}') + + ids = list(results_dict.keys())[:len(embeddings)] + data_embeddings = list(zip(ids, embeddings, strict=True)) + + upsert_data = [(str(id), embedding) for id, embedding in data_embeddings] + upsert_index(PROJECT_ID, LOCATION, VECTOR_SEARCH_INDEX_ID, upsert_data) + + +def create_bigquery_dataset_and_table( + project_id: str, + location: str, + dataset_id: str, + table_id: str, + documents: list[dict[str, str]], +) -> None: + """Creates a BigQuery dataset and table, and inserts documents. + + Args: + project_id: The ID of the Google Cloud project. + location: The location for the BigQuery resources. + dataset_id: The ID of the BigQuery dataset. + table_id: The ID of the BigQuery table. + documents: A list of dictionaries, where each dictionary represents a document + with 'id', 'content', and 'metadata' keys. 'content' and 'metadata' + are expected to be JSON serializable. + """ + client = bigquery.Client(project=project_id) + dataset_ref = bigquery.DatasetReference(project_id, dataset_id) + dataset = bigquery.Dataset(dataset_ref) + dataset.location = location + + try: + dataset = client.create_dataset(dataset, exists_ok=True) + logger.debug(f"Dataset {client.project}.{dataset.dataset_id} created.") + except Exception as e: + logger.exception(f"Error creating dataset: {e}") + raise e + + schema = [ + bigquery.SchemaField("id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("content", "JSON"), + bigquery.SchemaField("metadata", "JSON"), + ] + + table_ref = dataset_ref.table(table_id) + table = bigquery.Table(table_ref, schema=schema) + try: + table = client.create_table(table, exists_ok=True) + logger.debug(f"Table {table.project}.{table.dataset_id}.{table.table_id} created.") + except Exception as e: + logger.exception(f"Error creating table: {e}") + raise e + + rows_to_insert = [ + { + "id": doc["id"], + "content": json.dumps(doc["content"]), + "metadata": json.dumps(doc["metadata"]), + } + for doc in documents + ] + + errors = client.insert_rows_json(table, rows_to_insert) + if errors: + logger.error(f"Errors inserting rows: {errors}") + raise Exception(f"Failed to insert rows: {errors}") + else: + logger.debug(f"Inserted {len(rows_to_insert)} rows into BigQuery.") + + +def get_data_from_bigquery( + bq_client: bigquery.Client, + project_id: str, + dataset_id: str, + table_id: str, +) -> dict[str, str]: + """Retrieves data from a BigQuery table. + + Args: + bq_client: The BigQuery client. + project_id: The ID of the Google Cloud project. + dataset_id: The ID of the BigQuery dataset. + table_id: The ID of the BigQuery table. + + Returns: + A dictionary where keys are document IDs and values are JSON strings + representing the document content. + """ + table_ref = bigquery.TableReference.from_string( + f"{project_id}.{dataset_id}.{table_id}" + ) + query = f"SELECT id, content FROM `{table_ref}`" + query_job = bq_client.query(query) + rows = query_job.result() + + results = {row['id']: json.dumps(row['content']) for row in rows} + logger.debug(f'Found {len(results)} rows with different ids into BigQuery.') + + return results + + +def upsert_index( + project_id: str, + region: str, + index_name: str, + data: list[tuple[str, list[float]]], +) -> None: + """Upserts data points to a Vertex AI Index using batch processing. + + Args: + project_id: The ID of your Google Cloud project. + region: The region where the Index is located. + index_name: The name of the Vertex AI Index. + data: A list of tuples, where each tuple contains (id, embedding). + id should be a string, and embedding should be a list of floats. + """ + aiplatform.init(project=project_id, location=region) + + index_client = aiplatform_v1.IndexServiceClient( + client_options={"api_endpoint": f"{region}-aiplatform.googleapis.com"} + ) + + index_path = index_client.index_path( + project=project_id, location=region, index=index_name + ) + + datapoints = [ + aiplatform_v1.IndexDatapoint(datapoint_id=id, feature_vector=embedding) + for id, embedding in data + ] + + logger.debug(f'Attempting to insert {len(datapoints)} rows into Index {index_path}') + + upsert_request = aiplatform_v1.UpsertDatapointsRequest( + index=index_path, datapoints=datapoints + ) + + response = index_client.upsert_datapoints(request=upsert_request) + logger.info(f"Upserted {len(datapoints)} datapoints. Response: {response}") + + +async def main() -> None: + """Main function.""" + await logger.ainfo(await generate_embeddings()) + + +if __name__ == '__main__': + ai.run_main(main()) diff --git a/py/samples/vertex-ai-vector-search-firestore/LICENSE b/py/samples/vertex-ai-vector-search-firestore/LICENSE new file mode 100644 index 0000000000..2205396735 --- /dev/null +++ b/py/samples/vertex-ai-vector-search-firestore/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2025 Google LLC + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/py/samples/vertex-ai-vector-search-firestore/README.md b/py/samples/vertex-ai-vector-search-firestore/README.md new file mode 100644 index 0000000000..591c705d9a --- /dev/null +++ b/py/samples/vertex-ai-vector-search-firestore/README.md @@ -0,0 +1,27 @@ +# Vertex AI - Vector Search Firestore + +An example demonstrating the use Vector Search API with Firestore retriever for Vertex AI + +## Setup environment + +1. Install [GCP CLI](https://cloud.google.com/sdk/docs/install). +2. Run the following code to connect to VertexAI. +```bash +gcloud auth application-default login +``` +3. Set the following env vars to run the sample +``` +export LOCATION='' +export PROJECT_ID='' +export FIRESTORE_COLLECTION='' +export VECTOR_SEARCH_DEPLOYED_INDEX_ID='' +export VECTOR_SEARCH_INDEX_ENDPOINT_PATH='' +export VECTOR_SEARCH_API_ENDPOINT='' +``` +4. Run the sample. + +## Run the sample + +```bash +genkit start -- uv run src/sample.py +``` diff --git a/py/samples/vertex-ai-vector-search-firestore/pyproject.toml b/py/samples/vertex-ai-vector-search-firestore/pyproject.toml new file mode 100644 index 0000000000..3413399903 --- /dev/null +++ b/py/samples/vertex-ai-vector-search-firestore/pyproject.toml @@ -0,0 +1,39 @@ +[project] +authors = [{ name = "Google" }] +classifiers = [ + "Development Status :: 3 - Alpha", + "Environment :: Console", + "Environment :: Web Environment", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development :: Libraries", +] +dependencies = [ + "genkit", + "genkit-plugin-vertex-ai", + "pydantic>=2.10.5", + "structlog>=25.2.0", + "google-cloud-firestore", + "strenum>=0.4.15; python_version < '3.11'", +] +description = "An example demonstrating the use Vector Search API with Firestore retriever for Vertex AI" +license = { text = "Apache-2.0" } +name = "vertex-ai-vector-search-firestore" +readme = "README.md" +requires-python = ">=3.10" +version = "0.1.0" + +[build-system] +build-backend = "hatchling.build" +requires = ["hatchling"] + +[tool.hatch.build.targets.wheel] +packages = ["src/sample"] diff --git a/py/samples/vertex-ai-vector-search-firestore/src/sample.py b/py/samples/vertex-ai-vector-search-firestore/src/sample.py new file mode 100644 index 0000000000..a8bd67f563 --- /dev/null +++ b/py/samples/vertex-ai-vector-search-firestore/src/sample.py @@ -0,0 +1,135 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +import os +import time + +import structlog +from google.cloud import aiplatform, firestore +from pydantic import BaseModel + +from genkit.ai import Genkit +from genkit.blocks.document import Document +from genkit.plugins.vertex_ai import ( + EmbeddingModels, + VertexAI, + VertexAIVectorSearch, + vertexai_name, +) +from genkit.plugins.vertex_ai.models.retriever import FirestoreRetriever + +LOCATION = os.getenv('LOCATION') +PROJECT_ID = os.getenv('PROJECT_ID') +EMBEDDING_MODEL = EmbeddingModels.TEXT_EMBEDDING_004_ENG + +FIRESTORE_COLLECTION = os.getenv('FIRESTORE_COLLECTION') + +VECTOR_SEARCH_DEPLOYED_INDEX_ID = os.getenv('VECTOR_SEARCH_DEPLOYED_INDEX_ID') +VECTOR_SEARCH_INDEX_ENDPOINT_PATH = os.getenv('VECVECTOR_SEARCH_INDEX_ENDPOINT_PATHTOR_SEARCH_INDEX_ENDPOINT_ID') +VECTOR_SEARCH_API_ENDPOINT = os.getenv('VECTOR_SEARCH_API_ENDPOINT') + +firestore_client = firestore.Client(project=PROJECT_ID) +aiplatform.init(project=PROJECT_ID, location=LOCATION) + +logger = structlog.get_logger(__name__) + +ai = Genkit( + plugins=[ + VertexAI(), + VertexAIVectorSearch( + retriever=FirestoreRetriever, + retriever_extra_args={ + 'firestore_client': firestore_client, + 'collection_name': FIRESTORE_COLLECTION, + }, + embedder=vertexai_name(EMBEDDING_MODEL), + embedder_options={ + 'task': 'RETRIEVAL_DOCUMENT', + 'output_dimensionality': 128, + }, + ), + ] +) + + +class QueryFlowInputSchema(BaseModel): + """Input schema.""" + query: str + k: int + + +class QueryFlowOutputSchema(BaseModel): + """Output schema.""" + result: list[dict] + length: int + time: int + + +@ai.flow(name='queryFlow') +async def query_flow(_input: QueryFlowInputSchema) -> QueryFlowOutputSchema: + """Executes a vector search with VertexAI Vector Search.""" + start_time = time.time() + + query_document = Document.from_text(text=_input.query) + query_document.metadata = { + 'api_endpoint': VECTOR_SEARCH_API_ENDPOINT, + 'index_endpoint_path': VECTOR_SEARCH_INDEX_ENDPOINT_PATH, + 'deployed_index_id': VECTOR_SEARCH_DEPLOYED_INDEX_ID, + } + + options = { + 'limit': 10, + } + + result: list[Document] = await ai.retrieve( + retriever=vertexai_name('vertexAIVectorSearch'), + query=query_document, + options=options, + ) + + end_time = time.time() + + duration = int(end_time - start_time) + + result_data = [] + for doc in result.documents: + result_data.append({ + 'id': doc.metadata.get('id'), + 'text': doc.content[0].root.text, + 'distance': doc.metadata.get('distance'), + }) + + result_data = sorted(result_data, key=lambda x: x['distance']) + + return QueryFlowOutputSchema( + result=result_data, + length=len(result_data), + time=duration, + ) + + +async def main() -> None: + """Main function.""" + query_input = QueryFlowInputSchema( + query="Content for doc", + k=3, + ) + + await logger.ainfo(await query_flow(query_input)) + + +if __name__ == '__main__': + ai.run_main(main()) diff --git a/py/uv.lock b/py/uv.lock index 0585aac076..776dfe058b 100644 --- a/py/uv.lock +++ b/py/uv.lock @@ -36,6 +36,8 @@ members = [ "ollama-simple-embed", "short-n-long", "tool-interrupts", + "vertex-ai-vector-search-bigquery", + "vertex-ai-vector-search-firestore", ] [[package]] @@ -961,7 +963,7 @@ wheels = [ [[package]] name = "genkit" -version = "0.3.2" +version = "0.3.1" source = { editable = "packages/genkit" } dependencies = [ { name = "anyio" }, @@ -1158,6 +1160,9 @@ source = { editable = "plugins/vertex-ai" } dependencies = [ { name = "genkit" }, { name = "google-cloud-aiplatform" }, + { name = "google-cloud-bigquery" }, + { name = "google-cloud-firestore" }, + { name = "google-genai" }, { name = "pytest-mock" }, { name = "strenum", marker = "python_full_version < '3.11'" }, { name = "structlog" }, @@ -1167,6 +1172,9 @@ dependencies = [ requires-dist = [ { name = "genkit", editable = "packages/genkit" }, { name = "google-cloud-aiplatform", specifier = ">=1.77.0" }, + { name = "google-cloud-bigquery" }, + { name = "google-cloud-firestore" }, + { name = "google-genai", specifier = ">=1.7.0" }, { name = "pytest-mock" }, { name = "strenum", marker = "python_full_version < '3.11'", specifier = ">=0.4.15" }, { name = "structlog", specifier = ">=25.2.0" }, @@ -4617,6 +4625,52 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/63/9a/0962b05b308494e3202d3f794a6e85abe471fe3cafdbcf95c2e8c713aabd/uvloop-0.21.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a5c39f217ab3c663dc699c04cbd50c13813e31d917642d459fdcec07555cc553", size = 4660018 }, ] +[[package]] +name = "vertex-ai-vector-search-bigquery" +version = "0.1.0" +source = { editable = "samples/vertex-ai-vector-search-bigquery" } +dependencies = [ + { name = "genkit" }, + { name = "genkit-plugin-vertex-ai" }, + { name = "google-cloud-bigquery" }, + { name = "pydantic" }, + { name = "strenum", marker = "python_full_version < '3.11'" }, + { name = "structlog" }, +] + +[package.metadata] +requires-dist = [ + { name = "genkit", editable = "packages/genkit" }, + { name = "genkit-plugin-vertex-ai", editable = "plugins/vertex-ai" }, + { name = "google-cloud-bigquery" }, + { name = "pydantic", specifier = ">=2.10.5" }, + { name = "strenum", marker = "python_full_version < '3.11'", specifier = ">=0.4.15" }, + { name = "structlog", specifier = ">=25.2.0" }, +] + +[[package]] +name = "vertex-ai-vector-search-firestore" +version = "0.1.0" +source = { editable = "samples/vertex-ai-vector-search-firestore" } +dependencies = [ + { name = "genkit" }, + { name = "genkit-plugin-vertex-ai" }, + { name = "google-cloud-firestore" }, + { name = "pydantic" }, + { name = "strenum", marker = "python_full_version < '3.11'" }, + { name = "structlog" }, +] + +[package.metadata] +requires-dist = [ + { name = "genkit", editable = "packages/genkit" }, + { name = "genkit-plugin-vertex-ai", editable = "plugins/vertex-ai" }, + { name = "google-cloud-firestore" }, + { name = "pydantic", specifier = ">=2.10.5" }, + { name = "strenum", marker = "python_full_version < '3.11'", specifier = ">=0.4.15" }, + { name = "structlog", specifier = ">=25.2.0" }, +] + [[package]] name = "virtualenv" version = "20.30.0"