diff --git a/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb b/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb index 3ff7a606236a..a8c3544c4e63 100644 --- a/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb +++ b/examples/notebooks/beam-ml/alloydb_product_catalog_embeddings.ipynb @@ -29,6 +29,11 @@ "# under the License" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + }, { "cell_type": "markdown", "metadata": { @@ -158,14 +163,14 @@ }, { "cell_type": "code", - "source": [ - "!pip show apache-beam" - ], + "execution_count": null, "metadata": { "id": "2FlMPmA0IUuv" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "!pip show apache-beam" + ] }, { "cell_type": "markdown", @@ -205,6 +210,25 @@ "Replace these placeholder values with your actual AlloyDB connection details:" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Creating the AlloyDB Database\n", + "\n", + "Before running this notebook, ensure that the database you plan to use already exists in your AlloyDB instance.\n", + "\n", + "For example, you can create a database using PostgreSQL:\n", + "\n", + "CREATE DATABASE my_database_name;\n", + "\n", + "If the database does not exist, you may encounter an error such as:\n", + "\n", + "database \"\" does not exist\n", + "\n", + "Make sure that the value used in `DB_NAME` below matches the database you created in your AlloyDB instance." + ] + }, { "cell_type": "code", "execution_count": null, @@ -226,28 +250,28 @@ }, { "cell_type": "markdown", + "metadata": { + "id": "doK840yZZNdl" + }, "source": [ "## Authenticate to Google Cloud\n", "\n", "To connect to the AlloyDB instance via the language conenctor, we authenticate with Google Cloud." - ], - "metadata": { - "id": "doK840yZZNdl" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "CLM12rbiZHTN" + }, + "outputs": [], "source": [ "import sys\n", "if 'google.colab' in sys.modules:\n", " from google.colab import auth\n", " auth.authenticate_user(project_id=PROJECT_ID)" - ], - "metadata": { - "id": "CLM12rbiZHTN" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "code", @@ -2325,6 +2349,9 @@ }, { "cell_type": "markdown", + "metadata": { + "id": "yv4Rd1ZvsB_M" + }, "source": [ "## Streaming Embeddings Updates from PubSub\n", "\n", @@ -2334,13 +2361,15 @@ "\n", "### Authenticate with Google Cloud\n", "To use the PubSub, we authenticate with Google Cloud.\n" - ], - "metadata": { - "id": "yv4Rd1ZvsB_M" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "VCqJmaznt1nS" + }, + "outputs": [], "source": [ "# Replace with a valid Google Cloud project ID.\n", "PROJECT_ID = '' # @param {type:'string'}\n", @@ -2349,26 +2378,26 @@ "if 'google.colab' in sys.modules:\n", " from google.colab import auth\n", " auth.authenticate_user(project_id=PROJECT_ID)" - ], - "metadata": { - "id": "VCqJmaznt1nS" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "2FsoFaugtsln" + }, "source": [ "### Setting Up PubSub Resources\n", "\n", "First, let's set up the necessary PubSub topics and subscriptions:" - ], - "metadata": { - "id": "2FsoFaugtsln" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "nqMe0Brlt7Bk" + }, + "outputs": [], "source": [ "from google.cloud import pubsub_v1\n", "from google.api_core.exceptions import AlreadyExists\n", @@ -2385,26 +2414,26 @@ " print(f\"Created topic: {topic.name}\")\n", "except AlreadyExists:\n", " print(f\"Topic {topic_path} already exists.\")" - ], - "metadata": { - "id": "nqMe0Brlt7Bk" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "07ZFeGbMuFj_" + }, "source": [ "### Create AlloyDB Table for Streaming Updates\n", "\n", "Next, create a table to store the embedded data." - ], - "metadata": { - "id": "07ZFeGbMuFj_" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "3Xc70uV_uJy5" + }, + "outputs": [], "source": [ "table_name = \"streaming_product_embeddings\"\n", "table_schema = \"\"\"\n", @@ -2414,27 +2443,25 @@ " metadata JSONB,\n", " created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP\n", "\"\"\"" - ], - "metadata": { - "id": "3Xc70uV_uJy5" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "code", - "source": [ - "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,table_schema, DB_USER, DB_PASSWORD)\n", - "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, DB_USER, DB_PASSWORD)" - ], + "execution_count": null, "metadata": { "id": "8HPhUfAuorBP" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "setup_alloydb_table_sqlalchemy(INSTANCE_URI, DB_NAME, table_name,table_schema, DB_USER, DB_PASSWORD)\n", + "test_alloydb_connection_sqlalchemy(INSTANCE_URI, DB_NAME, table_name, DB_USER, DB_PASSWORD)" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "LSriDxtsn1wH" + }, "source": [ "### Configure the Pipeline options\n", "To run the pipeline on DataFlow we need\n", @@ -2443,13 +2470,15 @@ "- AlloyDB [private IP](https://cloud.google.com/alloydb/docs/private-ip#:~:text=Using%20private%20IP%20addresses%20keeps,instance%20and%20potential%20attack%20surface.) address to which the Datflow worker VM's have access. There are multiple ways to [connect](https://cloud.google.com/alloydb/docs/connection-overview) to your AlloyDB instance from Datflow, including\n", " - Setting up [Private services access](https://cloud.google.com/alloydb/docs/about-private-services-access)\n", " - Setting up [Private service connect](https://cloud.google.com/alloydb/docs/about-private-service-connect)\n" - ], - "metadata": { - "id": "LSriDxtsn1wH" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "kR0x7vzTrUlZ" + }, + "outputs": [], "source": [ "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, SetupOptions, GoogleCloudOptions, WorkerOptions\n", "\n", @@ -2493,15 +2522,13 @@ "\n", "# options.view_as(SetupOptions).save_main_session = True\n", "options.view_as(SetupOptions).requirements_file = \"./requirements.txt\"\n" - ], - "metadata": { - "id": "kR0x7vzTrUlZ" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "gMKuccfHoDki" + }, "source": [ "### Provide additional Python dependencies to be installed on Worker VM's\n", "\n", @@ -2509,25 +2536,25 @@ "\n", "See [Managing Python Pipeline Dependencies](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/) for more details.\n", "\n" - ], - "metadata": { - "id": "gMKuccfHoDki" - } + ] }, { "cell_type": "code", - "source": [ - "!echo \"sentence-transformers\" > ./requirements.txt\n", - "!cat ./requirements.txt" - ], + "execution_count": null, "metadata": { "id": "RTGoA0SmoEvm" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "!echo \"sentence-transformers\" > ./requirements.txt\n", + "!cat ./requirements.txt" + ] }, { "cell_type": "markdown", + "metadata": { + "id": "eU0Sn19nqzLM" + }, "source": [ "### Configure and Run Pipeline\n", "\n", @@ -2538,13 +2565,15 @@ "3. **Transformation**: Converts JSON messages to Chunk objects for embedding\n", "4. **ML Processing**: Generates embeddings using HuggingFace models\n", "5. **Sink**: Writes results to AlloyDB with conflict resolution" - ], - "metadata": { - "id": "eU0Sn19nqzLM" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "w2pmJn5fqXHx" + }, + "outputs": [], "source": [ "import apache_beam as beam\n", "import tempfile\n", @@ -2601,15 +2630,13 @@ " )\n", ")\n", "\n" - ], - "metadata": { - "id": "w2pmJn5fqXHx" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "r7nJdc09vs98" + }, "source": [ "### Create Publisher Subprocess\n", "The publisher simulates real-time product updates by:\n", @@ -2617,13 +2644,16 @@ "- Modifying prices and descriptions to represent changes\n", "- Adding timestamps to track update times\n", "- Running for 25 minutes in the background while our pipeline processes the data" - ], - "metadata": { - "id": "r7nJdc09vs98" - } + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "cellView": "form", + "id": "C9Bf0Nb0vY7r" + }, + "outputs": [], "source": [ "#@title Define PubSub publisher function\n", "import threading\n", @@ -2692,25 +2722,24 @@ "\n", " logger.info(\"Finished publishing all messages.\")\n", " file_handler.flush()" - ], - "metadata": { - "id": "C9Bf0Nb0vY7r", - "cellView": "form" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", - "source": [ - "#### Start publishing to PuBSub in background" - ], "metadata": { "id": "jnUSynmjEmVr" - } + }, + "source": [ + "#### Start publishing to PuBSub in background" + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "ZnBBTwZHw7Ex" + }, + "outputs": [], "source": [ "# Launch publisher in a separate thread\n", "print(\"Starting publisher thread in 5 minutes...\")\n", @@ -2722,15 +2751,13 @@ "publisher_thread.start()\n", "print(f\"Publisher thread started with ID: {publisher_thread.ident}\")\n", "print(f\"Publisher thread logging to file: publisher_{publisher_thread.ident}.log\")" - ], - "metadata": { - "id": "ZnBBTwZHw7Ex" - }, - "execution_count": null, - "outputs": [] + ] }, { "cell_type": "markdown", + "metadata": { + "id": "vGToqM9GoKOV" + }, "source": [ "### Run Pipeline on Dataflow\n", "\n", @@ -2748,44 +2775,41 @@ "- Price and description changes reflected in the metadata\n", "- New embeddings generated for updated product descriptions\n", "- Timestamps showing when each record was last modified" - ], - "metadata": { - "id": "vGToqM9GoKOV" - } + ] }, { "cell_type": "code", - "source": [ - "# Run pipeline\n", - "pipeline.run().wait_until_finish()" - ], + "execution_count": null, "metadata": { "id": "NTibYI9rx46o" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "# Run pipeline\n", + "pipeline.run().wait_until_finish()" + ] }, { "cell_type": "markdown", - "source": [ - "### Verify data" - ], "metadata": { "id": "vX9VxJ82CTum" - } + }, + "source": [ + "### Verify data" + ] }, { "cell_type": "code", + "execution_count": null, + "metadata": { + "id": "zSb1UoCSznkW" + }, + "outputs": [], "source": [ "# Verify the results\n", "print(\"\\nAfter embedding generation:\")\n", "verify_embeddings_sqlalchemy(instance_uri=INSTANCE_URI, database=DB_NAME, table_name=table_name, user=DB_USER, password=DB_PASSWORD)" - ], - "metadata": { - "id": "zSb1UoCSznkW" - }, - "execution_count": null, - "outputs": [] + ] } ], "metadata": { @@ -2805,4 +2829,4 @@ }, "nbformat": 4, "nbformat_minor": 0 -} \ No newline at end of file +}