Skip to content

Commit

Permalink
rm prints
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Nov 4, 2024
1 parent a614ae3 commit 08e6d70
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 42 deletions.
92 changes: 56 additions & 36 deletions docs/ingest_strategy.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,64 +8,83 @@ When building RAG applications, you often need to load and refresh content from
We use [Prefect](https://docs.prefect.io) to handle these challenges, giving us:

- Automatic caching of expensive operations
- Concurrent processing with backpressure
- Concurrent processing
- Observability and retries

Let's look at a real example that demonstrates these concepts.

## Building a Knowledge Base

```python
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "prefect",
# "raggy[tpuf]",
# ]
# ///

from itertools import chain
from datetime import timedelta
import httpx
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect import flow, task, unmapped
from prefect.cache_policies import INPUTS

from raggy.documents import Document
from raggy.loaders.base import Loader
from raggy.loaders.github import GitHubRepoLoader
from raggy.loaders.web import SitemapLoader
from raggy.vectorstores.tpuf import TurboPuffer

# Cache based on content changes
def get_last_modified(context, parameters):
"""Only reload if the content has changed."""
try:
return httpx.head(parameters["urls"][0]).headers.get("Last-Modified", "")
except Exception:
return None

@task(
cache_key_fn=get_last_modified,
cache_policy=INPUTS,
cache_expiration=timedelta(hours=24),
task_run_name="gather documents using {loader.__class__.__name__}",
retries=2,
)
async def gather_documents(urls: list[str]):
return await SitemapLoader(urls=urls).load()
async def gather_documents(loader: Loader) -> list[Document]:
return await loader.load()

@flow(flow_run_name="refresh knowledge in {namespace} from urls {urls} and repos {repos}")
def refresh_knowledge(
urls: list[str] | None = None,
repos: list[str] | None = None,
include_globs: list[str] | None = None,
namespace: str = "knowledge",
):

@flow
async def refresh_knowledge():
# Load from multiple sources
documents = []
for loader in [
SitemapLoader(urls=["https://docs.prefect.io/sitemap.xml"]),
GitHubRepoLoader(repo="PrefectHQ/prefect", include_globs=["README.md"]),
]:
documents.extend(await gather_documents(loader))

# Store efficiently with concurrent embedding
with TurboPuffer(namespace="knowledge") as tpuf:
await tpuf.upsert_batched(
documents,
batch_size=100, # tune based on document size
max_concurrent=8 # tune based on rate limits
)
_2d_list_of_documents = gather_documents.map(
[
SitemapLoader(urls=urls),
*[
GitHubRepoLoader(repo=repo, include_globs=include_globs or ["README.md"])
for repo in repos
],
]
).result()

# batch embedding and upserts to the vector store
with TurboPuffer(namespace=namespace) as tpuf:
task(tpuf.upsert_batched).submit(
documents=list(chain.from_iterable(_2d_list_of_documents)),
batch_size=unmapped(100), # tune based on document size
max_concurrent=unmapped(8), # tune based on rate limits
).wait()

if __name__ == "__main__":
refresh_knowledge(
urls=["https://docs.prefect.io/sitemap.xml"],
repos=["PrefectHQ/prefect"],
include_globs=["README.md"],
namespace="test-knowledge",
)
```

This example shows key patterns:

1. Content-aware caching (`Last-Modified` headers, commit SHAs, etc)
2. Automatic retries for resilience
3. Concurrent processing with backpressure
4. Efficient batching of embedding operations
1. Automatic retries for resilience
2. Concurrent processing
3. Efficient batching of embedding operations

See the [refresh examples](https://github.com/zzstoatzz/raggy/tree/main/examples/refresh_vectorstore) for complete implementations using both Chroma and TurboPuffer.

Expand All @@ -77,9 +96,10 @@ For production workloads:
retries=2,
retry_delay_seconds=[3, 60], # exponential backoff
cache_expiration=timedelta(days=1),
cache_policy=INPUTS, # for example, hash based on provided parameters
persist_result=True, # save results to storage
)
async def gather_documents(loader):
async def gather_documents(loader: Loader) -> list[Document]:
return await loader.load()
```

Expand Down
13 changes: 8 additions & 5 deletions src/raggy/vectorstores/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def delete(
self.collection.delete(
ids=ids,
where=where,
where_document=where_document,
where_document=where_document, # type: ignore
)

def add(self, documents: Sequence[RaggyDocument]) -> list[ChromaDocument]:
Expand Down Expand Up @@ -96,7 +96,7 @@ def query(
n_results: int = 10,
where: dict | None = None,
where_document: dict | None = None,
include: Include = ["metadatas"],
include: Include = ["metadatas"], # type: ignore
**kwargs,
) -> QueryResult:
return self.collection.query(
Expand Down Expand Up @@ -130,7 +130,7 @@ def upsert(self, documents: Sequence[RaggyDocument]) -> list[ChromaDocument]:
create_openai_embeddings([document.text for document in documents])
),
)
self.collection.upsert(**kwargs)
self.collection.upsert(**kwargs) # type: ignore

get_result = self.collection.get(ids=kwargs["ids"])
return get_result.get("documents") or []
Expand Down Expand Up @@ -192,7 +192,10 @@ async def _upsert(b=batch, n=i):

# Do the upsert
self.collection.upsert(**kwargs)
print(f"Upserted batch {n + 1}/{len(batches)} ({len(b)} documents)")
self.logger.debug_kv(
"Upserted",
f"Batch {n + 1}/{len(batches)} ({len(b)} documents)",
)

tasks.append(_upsert)

Expand All @@ -215,7 +218,7 @@ def query_collection(
n_results=top_k,
where=where,
where_document=where_document,
include=["documents"],
include=["documents"], # type: ignore
)

assert (
Expand Down
5 changes: 4 additions & 1 deletion src/raggy/vectorstores/tpuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ async def _upsert(b=batch, n=i):
vectors=embeddings,
attributes={"text": texts},
)
print(f"Upserted batch {n + 1}/{len(batches)} ({len(b)} documents)")
self.logger.debug_kv(
"Upserted",
f"Batch {n + 1}/{len(batches)} ({len(b)} documents)",
)

tasks.append(_upsert)

Expand Down

0 comments on commit 08e6d70

Please sign in to comment.