Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deploy example and fix typing #382

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/read-hn.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim

RUN apt-get update && apt-get install -y git

RUN rm -rf /var/lib/apt/lists/*

WORKDIR /app

ENV UV_SYSTEM_PYTHON=1

RUN uv pip install controlflow

98 changes: 98 additions & 0 deletions examples/read_hn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# /// script
# dependencies = ["controlflow"]
# ///

import os
import sys
from pathlib import Path
from typing import Annotated, TypedDict

import httpx
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact
from prefect.docker import DockerImage
from prefect.runner.storage import GitCredentials, GitRepository
from pydantic import AnyHttpUrl, Field, TypeAdapter

import controlflow as cf


class HNArticleSummary(TypedDict):
link: AnyHttpUrl
title: str
main_topics: Annotated[set[str], Field(min_length=1, max_length=5)]
key_takeaways: Annotated[set[str], Field(min_length=1, max_length=5)]
tech_domains: Annotated[set[str], Field(min_length=1, max_length=5)]


@cf.task(instructions="concise, main details")
def analyze_article(id: str) -> HNArticleSummary:
"""Analyze a HackerNews article and return structured insights"""
content = httpx.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json").json()
return f"here is the article content: {content}" # type: ignore


@flow(retries=2)
def analyze_hn_articles(n: int = 5) -> list[HNArticleSummary]:
top_article_ids = httpx.get(
"https://hacker-news.firebaseio.com/v0/topstories.json"
).json()[:n]
briefs = analyze_article.map(top_article_ids).result()
create_markdown_artifact(
key="hn-article-exec-summary",
markdown=task(task_run_name=f"make summary of {len(briefs)} articles")(cf.run)(
objective="markdown summary of all extracted article briefs",
result_type=Annotated[str, Field(description="markdown summary")],
context=dict(briefs=briefs),
),
description="executive summary of all extracted article briefs",
)
return briefs


if __name__ == "__main__":
EVERY_12_HOURS_CRON = "0 */12 * * *"
if len(sys.argv) > 1 and sys.argv[1] == "serve":
analyze_hn_articles.serve(
parameters={"n": 5},
cron=EVERY_12_HOURS_CRON,
)
elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy":
analyze_hn_articles.from_source(
source=str((p := Path(__file__)).parent.resolve()),
entrypoint=f"{p.name}:analyze_hn_articles",
).deploy(
name="local-deployment",
work_pool_name="local",
cron=EVERY_12_HOURS_CRON,
)
elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy":
repo = GitRepository(
url="https://github.com/PrefectHQ/controlflow.git",
branch="main",
credentials=None, # replace with `dict(username="", access_token="")` for private repos
)
analyze_hn_articles.from_source(
source=repo,
entrypoint="examples/read_hn.py:analyze_articles",
).deploy(
name="docker-deployment",
# image=DockerImage( # uncomment and replace with your own image if desired
# name="zzstoatzz/cf-read-hn",
# tag="latest",
# dockerfile=str(Path(__file__).parent.resolve() / "read-hn.Dockerfile"),
# ),
work_pool_name="docker-work", # uv pip install -U prefect-docker prefect worker start --pool docker-work --type docker
cron=EVERY_12_HOURS_CRON,
parameters={"n": 5},
job_variables={
"env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")},
"image": "zzstoatzz/cf-read-hn:latest", # publicly available image on dockerhub
},
build=False,
push=False,
)
else:
print(f"just running the code\n\n\n\n\n\n")
briefs = analyze_hn_articles(5) # type: ignore
TypeAdapter(list[HNArticleSummary]).validate_python(briefs)
172 changes: 86 additions & 86 deletions src/controlflow/decorators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import functools
import inspect
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, Optional, TypeVar, Union, cast

from prefect import Flow as PrefectFlow
from prefect import Task as PrefectTask
from prefect.utilities.asyncutils import run_coro_as_sync
from typing_extensions import ParamSpec

import controlflow
from controlflow.agents import Agent
Expand All @@ -14,11 +17,14 @@

# from controlflow.utilities.marvin import patch_marvin

P = ParamSpec("P")
R = TypeVar("R")

logger = get_logger(__name__)


def flow(
fn: Optional[Callable[..., Any]] = None,
fn: Optional[Callable[P, R]] = None,
*,
thread: Optional[str] = None,
instructions: Optional[str] = None,
Expand All @@ -29,8 +35,8 @@ def flow(
timeout_seconds: Optional[Union[float, int]] = None,
prefect_kwargs: Optional[dict[str, Any]] = None,
context_kwargs: Optional[list[str]] = None,
**kwargs: Optional[dict[str, Any]],
):
**kwargs: Any,
) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]:
"""
A decorator that wraps a function as a ControlFlow flow.

Expand All @@ -54,7 +60,7 @@ def flow(
callable: The wrapped function or a new flow decorator if `fn` is not provided.
"""
if fn is None:
return functools.partial(
return functools.partial( # type: ignore
flow,
thread=thread,
instructions=instructions,
Expand All @@ -70,13 +76,15 @@ def flow(
sig = inspect.signature(fn)

def create_flow_context(bound_args):
flow_kwargs = kwargs.copy()
flow_kwargs: dict[str, Any] = kwargs.copy()
if thread is not None:
flow_kwargs.setdefault("thread_id", thread)
flow_kwargs["thread_id"] = thread
if tools is not None:
flow_kwargs.setdefault("tools", tools)
flow_kwargs["tools"] = tools
if default_agent is not None:
flow_kwargs.setdefault("default_agent", default_agent)
flow_kwargs["default_agent"] = default_agent

flow_kwargs.update(kwargs)

context = {}
if context_kwargs:
Expand All @@ -92,7 +100,7 @@ def create_flow_context(bound_args):
if asyncio.iscoroutinefunction(fn):

@functools.wraps(fn)
async def wrapper(*wrapper_args, **wrapper_kwargs):
async def wrapper(*wrapper_args, **wrapper_kwargs): # type: ignore
bound = sig.bind(*wrapper_args, **wrapper_kwargs)
bound.apply_defaults()
with (
Expand All @@ -112,17 +120,19 @@ def wrapper(*wrapper_args, **wrapper_kwargs):
):
return fn(*wrapper_args, **wrapper_kwargs)

wrapper = prefect_flow(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
**(prefect_kwargs or {}),
)(wrapper)
return wrapper
return cast(
Callable[[Callable[P, R]], PrefectFlow[P, R]],
prefect_flow(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
**(prefect_kwargs or {}),
)(wrapper),
)


def task(
fn: Optional[Callable[..., Any]] = None,
fn: Optional[Callable[P, R]] = None,
*,
objective: Optional[str] = None,
instructions: Optional[str] = None,
Expand All @@ -133,8 +143,8 @@ def task(
retries: Optional[int] = None,
retry_delay_seconds: Optional[Union[float, int]] = None,
timeout_seconds: Optional[Union[float, int]] = None,
**task_kwargs: Optional[dict[str, Any]],
):
**task_kwargs: Any,
) -> Callable[[Callable[P, R]], PrefectTask[P, R]]:
"""
A decorator that turns a Python function into a Task. The Task objective is
set to the function name, and the instructions are set to the function
Expand All @@ -157,78 +167,68 @@ def task(
callable: The wrapped function or a new task decorator if `fn` is not provided.
"""

if fn is None:
return functools.partial(
task,
objective=objective,
instructions=instructions,
name=name,
agents=agents,
tools=tools,
interactive=interactive,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
timeout_seconds=timeout_seconds,
**task_kwargs,
)

sig = inspect.signature(fn)

if name is None:
name = fn.__name__

if objective is None:
objective = fn.__doc__ or ""
def decorator(func: Callable[P, R]) -> PrefectTask[P, R]:
sig = inspect.signature(func)

result_type = fn.__annotations__.get("return")

def _get_task(*args, **kwargs) -> Task:
# first process callargs
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
context = bound.arguments.copy()

# call the function to see if it produces an updated objective
maybe_coro = fn(*args, **kwargs)
if asyncio.iscoroutine(maybe_coro):
result = run_coro_as_sync(maybe_coro)
if name is None:
task_name = func.__name__
else:
result = maybe_coro
if result is not None:
context["Additional context"] = result
task_name = name

return Task(
objective=objective,
instructions=instructions,
name=name,
agents=agents,
context=context,
result_type=result_type,
interactive=interactive or False,
tools=tools or [],
**task_kwargs,
)
if objective is None:
task_objective = func.__doc__ or ""
else:
task_objective = objective

if asyncio.iscoroutinefunction(fn):
result_type = func.__annotations__.get("return")

@functools.wraps(fn)
async def wrapper(*args, **kwargs):
task = _get_task(*args, **kwargs)
return await task.run_async()
else:
def _get_task(*args, **kwargs) -> Task:
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
context = bound.arguments.copy()

maybe_coro = func(*args, **kwargs)
if asyncio.iscoroutine(maybe_coro):
result = run_coro_as_sync(maybe_coro)
else:
result = maybe_coro
if result is not None:
context["Additional context"] = result

return Task(
objective=task_objective,
instructions=instructions,
name=task_name,
agents=agents,
context=context,
result_type=result_type,
interactive=interactive or False,
tools=tools or [],
**task_kwargs,
)

if asyncio.iscoroutinefunction(func):

@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # type: ignore
task = _get_task(*args, **kwargs)
return await task.run_async() # type: ignore
else:

@functools.wraps(fn)
def wrapper(*args, **kwargs):
task = _get_task(*args, **kwargs)
return task.run()
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
task = _get_task(*args, **kwargs)
return task.run() # type: ignore

wrapper = prefect_task(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)(wrapper)
prefect_wrapper = prefect_task(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)(wrapper)

# store the `as_task` method for loading the task object
wrapper.as_task = _get_task
setattr(prefect_wrapper, "as_task", _get_task)
return cast(PrefectTask[P, R], prefect_wrapper)

return wrapper
if fn is None:
return decorator
return decorator(fn) # type: ignore
6 changes: 4 additions & 2 deletions src/controlflow/instructions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import contextmanager
from typing import Generator, List
from typing import Generator, List, Union

from controlflow.utilities.context import ctx
from controlflow.utilities.logging import get_logger
Expand All @@ -8,7 +8,9 @@


@contextmanager
def instructions(instructions: str) -> Generator[list[str], None, None]:
def instructions(
instructions: Union[str, None],
) -> Generator[Union[list[str], None], None, None]:
"""
Temporarily add instructions to the current instruction stack. The
instruction is removed when the context is exited.
Expand Down