Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CI for agent example within AI cookbook #28

Merged
merged 7 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Unit tests

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r dev/dev_requirements.txt
- name: Test with pytest
run: |
pytest
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
genai_cookbook/_build
env
.env
.DS_STORE
.DS_STORE
.idea
__pycache__

# Exclude `databricks sync` CLI command snapshots
.databricks
28 changes: 17 additions & 11 deletions agent_app_sample_code/02_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@

# COMMAND ----------

# MAGIC %run ./utils/install_aptget_package
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This notebook doesn't exist but also seems unnecessary to run


# COMMAND ----------

# MAGIC %md
# MAGIC ## Import the global configuration

Expand All @@ -56,7 +52,7 @@
# COMMAND ----------

# MAGIC %md
# MAGIC ## Set the MLflow experiement name
# MAGIC ## Set the MLflow experiment name
# MAGIC
# MAGIC Used to track information about this Data Pipeline that are used in the later notebooks.

Expand Down Expand Up @@ -291,21 +287,31 @@ def file_parser(

# COMMAND ----------

# MAGIC %run ./utils/load_uc_volume_to_delta_table
from utils.file_loading import load_files_to_df, apply_parsing_udf

# COMMAND ----------

load_uc_volume_to_delta_table(
raw_files_df = load_files_to_df(
spark=spark,
source_path=SOURCE_UC_VOLUME,
dest_table_name=DOCS_DELTA_TABLE,
)

parsed_files_df = apply_parsing_udf(
raw_files_df=raw_files_df,
# Modify this function to change the parser, extract additional metadata, etc
parse_file_udf=file_parser,
# The schema of the resulting Delta Table will follow the schema defined in ParserReturnValue
spark_dataframe_schema=typed_dicts_to_spark_schema(ParserReturnValue),
parsed_df_schema=typed_dicts_to_spark_schema(ParserReturnValue)
)

print(DOCS_DELTA_TABLE)
display(spark.table(DOCS_DELTA_TABLE))
# Write to a Delta Table
parsed_files_df.write.mode("overwrite").option(
"overwriteSchema", "true"
).saveAsTable(DOCS_DELTA_TABLE)

# Display for debugging
print(f"Parsed {parsed_files_df.count()} documents.")
parsed_files_df.display()

# Log the resulting table to MLflow
mlflow.log_input(
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions agent_app_sample_code/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import sys
import os

# Add the root directory to sys.path, so that we can treat directories like
# agent_app_sample_code as modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
46 changes: 46 additions & 0 deletions agent_app_sample_code/tests/test_file_loading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
import pyspark
import pandas as pd

from agent_app_sample_code.utils.file_loading import load_files_to_df

@pytest.fixture(scope="module")
def spark():
return (
pyspark.sql.SparkSession.builder
.master("local[1]")
# Uncomment the following line for testing on Apple silicon locally
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create some control flag such that we can run pytest locally (on apple silicon) and get this to work and then CI can have the flag the other way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call - actually, looks like CI seems to work with this stuff enabled by default too? So will just remove the comment

.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.task.maxFailures", "1") # avoid retry failed spark tasks
.getOrCreate()
)

def test_load_files_to_df(spark, tmpdir):
temp_dir = tmpdir.mkdir("files_subdir")
file_1 = temp_dir.join("file1.txt")
file_2 = temp_dir.join("file2.txt")
file_1.write("file1 content")
file_2.write("file2 content")
raw_files_df = load_files_to_df(spark, str(temp_dir)).drop("modificationTime").orderBy("path")
assert raw_files_df.count() == 2
raw_pandas_df = raw_files_df.toPandas()
# Decode the content from bytes to string
raw_pandas_df['content'] = raw_pandas_df['content'].apply(
lambda x: bytes(x).decode('utf-8')
)
# Expected DataFrame
expected_df = pd.DataFrame([{
"path": f"file:{str(file_1)}",
"length": len("file1 content"),
"content": "file1 content",
}, {
"path": f"file:{str(file_2)}",
"length": len("file2 content"),
"content": "file2 content",
}])
pd.testing.assert_frame_equal(raw_pandas_df, expected_df)

def test_load_files_to_df_throws_if_no_files(spark, tmpdir):
temp_dir = tmpdir.mkdir("files_subdir")
with pytest.raises(Exception, match="does not contain any files"):
load_files_to_df(spark, str(temp_dir))
Empty file.
2 changes: 1 addition & 1 deletion agent_app_sample_code/utils/build_retriever_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def find_index(endpoint_name, index_name):

if create_index:
print(
f"Computing document embeddings and Vector Search Index {get_table_url}. This can take 15 minutes or much longer if you have a larger number of documents."
f"Computing document embeddings and Vector Search Index. This can take 15 minutes or much longer if you have a larger number of documents."
)

vsc.create_delta_sync_index_and_wait(
Expand Down
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to go in this direction, should we make this change for all the util and validator notebooks?

Copy link
Collaborator Author

@smurching smurching Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we should - to your point let me also follow up with some links to docs (e.g. https://docs.databricks.com/en/files/workspace-modules.html#work-with-python-and-r-modules) on how to iterate on these modules etc

Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
# Databricks notebook source
# MAGIC %md
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't import from notebooks - need to make this a normal python file by deleting the "Databricks notebook source" header here

# MAGIC ##### `load_uc_volume_to_delta_table`
# MAGIC
# MAGIC `load_uc_volume_to_delta_table` loads files from a specified source path into a Delta Table after parsing and extracting metadata.
# MAGIC
# MAGIC Arguments:
# MAGIC - source_path: The path to the folder of files. This should be a valid directory path where the files are stored.
# MAGIC - dest_table_name: The name of the destination Delta Table.
# MAGIC - parse_file_udf: A user-defined function that takes the bytes of the file, parses it, and returns the parsed content and metadata.
# MAGIC For example: `def parse_file(raw_doc_contents_bytes, doc_path): return {'doc_content': content, 'metadata': metadata}`
# MAGIC - spark_dataframe_schema: The schema of the resulting Spark DataFrame after parsing and metadata extraction.

# COMMAND ----------

import json
import traceback
from datetime import datetime
from typing import Any, Callable, TypedDict, Dict
Expand All @@ -22,9 +6,10 @@
import warnings
import pyspark.sql.functions as func
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame, SparkSession


def parse_and_extract(
def _parse_and_extract(
raw_doc_contents_bytes: bytes,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making stuff private if it's not used outside this file

modification_time: datetime,
doc_bytes_length: int,
Expand Down Expand Up @@ -57,7 +42,7 @@ def parse_and_extract(
}


def get_parser_udf(
def _get_parser_udf(
# extract_metadata_udf: Callable[[[dict, Any]], str],
parse_file_udf: Callable[[[dict, Any]], str],
spark_dataframe_schema: StructType,
Expand All @@ -71,7 +56,7 @@ def get_parser_udf(
"""
# This UDF will load each file, parse the doc, and extract metadata.
parser_udf = func.udf(
lambda raw_doc_contents_bytes, modification_time, doc_bytes_length, doc_path: parse_and_extract(
lambda raw_doc_contents_bytes, modification_time, doc_bytes_length, doc_path: _parse_and_extract(
raw_doc_contents_bytes,
modification_time,
doc_bytes_length,
Expand All @@ -82,16 +67,18 @@ def get_parser_udf(
)
return parser_udf

def load_files_to_df(
spark: SparkSession,
source_path: str) -> DataFrame:
"""
Load files from a directory into a Spark DataFrame.
Each row in the DataFrame will contain the path, length, and content of the file; for more
details, see https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
"""

def load_uc_volume_to_delta_table(
source_path: str,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This util is hard to test because it does so much, it's easier to break this down into dataframe operations (load files into DF, apply parsing, etc) and test those

dest_table_name: str,
parse_file_udf: Callable[[[dict, Any]], str],
spark_dataframe_schema: StructType
) -> str:
if not os.path.exists(source_path):
raise ValueError(
f"{source_path} passed to `load_uc_volume_to_delta_table` does not exist."
f"{source_path} passed to `load_uc_volume_files` does not exist."
)

# Load the raw riles
Expand All @@ -105,12 +92,19 @@ def load_uc_volume_to_delta_table(
raise Exception(f"`{source_path}` does not contain any files.")

print(f"Found {raw_files_df.count()} files in {source_path}.")
display(raw_files_df)
raw_files_df.show()
return raw_files_df

print()

def apply_parsing_udf(raw_files_df: DataFrame, parse_file_udf: Callable[[[dict, Any]], str], parsed_df_schema: StructType) -> DataFrame:
"""
Apply a file-parsing UDF to a DataFrame whose rows correspond to file content/metadata loaded via
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
Returns a DataFrame with the parsed content and metadata.
"""
print("Running parsing & metadata extraction UDF in spark...")

parser_udf = get_parser_udf(parse_file_udf, spark_dataframe_schema)
parser_udf = _get_parser_udf(parse_file_udf, parsed_df_schema)

# Run the parsing
parsed_files_staging_df = raw_files_df.withColumn(
Expand All @@ -127,7 +121,7 @@ def load_uc_volume_to_delta_table(
display_markdown(
f"### {num_errors} documents had parse errors. Please review.", raw=True
)
display(errors_df)
errors_df.show()
Copy link
Collaborator Author

@smurching smurching Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

display() doesn't work in unit tests, asked folks about it in Slack

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats unfortunate. if we're not writing tests that actually look at what is displayed, can we just monkeypatch this out?


if errors_df.count() == parsed_files_staging_df.count():
raise ValueError(
Expand All @@ -139,34 +133,21 @@ def load_uc_volume_to_delta_table(
display_markdown(
f"### {num_errors} documents have no content. Please review.", raw=True
)
display(errors_df)
errors_df.show()

if num_empty_content == parsed_files_staging_df.count():
raise ValueError("All documents are empty. Please review.")

# Filter for successfully parsed files
# Change the schema to the resulting schema
resulting_fields = [field.name for field in spark_dataframe_schema.fields]
resulting_fields = [field.name for field in parsed_df_schema.fields]

parsed_files_df = parsed_files_staging_df.filter(
parsed_files_staging_df.parsing.parser_status == "SUCCESS"
)

# display(parsed_files_df)
parsed_files_df.show()
parsed_files_df = parsed_files_df.select(
*[func.col(f"parsing.{field}").alias(field) for field in resulting_fields]
)

# Write to a aDelta Table and overwrite it.
parsed_files_df.write.mode("overwrite").option(
"overwriteSchema", "true"
).saveAsTable(dest_table_name)

# Reload to get correct lineage in UC.
parsed_files_df = spark.table(dest_table_name)

# Display for debugging
print(f"Parsed {parsed_files_df.count()} documents.")
# display(parsed_files_df)

return dest_table_name
return parsed_files_df
4 changes: 2 additions & 2 deletions dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
To start working on this book:
- clone the repo; `cd cookbook`
- use your preferred approach to starting a new python environment
- in that environment, `pip install jupyter-book`
- in that environment, `pip install -r dev/dev_requirements.txt`
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding dev requirements needed to run tests, lint, etc

- build and preview the site with `jupyter-book build --all genai_cookbook`

The homepage is at `genai_cookbook/index.md`

The content pages are in `genai_cookbook/nbs/`

Jupyter book is fairly flexible and offers a lot of different options for formatting, cross-referencing, adding formatted callouts, etc. Read more at the [Jupyter Book docs](https://jupyterbook.org/en/stable/intro.html).
Jupyter book is fairly flexible and offers a lot of different options for formatting, cross-referencing, adding formatted callouts, etc. Read more at the [Jupyter Book docs](https://jupyterbook.org/en/stable/intro.html).
8 changes: 7 additions & 1 deletion dev/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
jupyter-book
livereload
livereload
black
pytest
mlflow-skinny[databricks]
databricks-vectorsearch
pyspark
pandas
Loading