diff --git a/examples/read-hn.Dockerfile b/examples/read-hn.Dockerfile new file mode 100644 index 0000000..26a5c78 --- /dev/null +++ b/examples/read-hn.Dockerfile @@ -0,0 +1,12 @@ +FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim + +RUN apt-get update && apt-get install -y git + +RUN rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +ENV UV_SYSTEM_PYTHON=1 + +RUN uv pip install controlflow + diff --git a/examples/read_hn.py b/examples/read_hn.py new file mode 100644 index 0000000..75acb0f --- /dev/null +++ b/examples/read_hn.py @@ -0,0 +1,98 @@ +# /// script +# dependencies = ["controlflow"] +# /// + +import os +import sys +from pathlib import Path +from typing import Annotated, TypedDict + +import httpx +from prefect import flow, task +from prefect.artifacts import create_markdown_artifact +from prefect.docker import DockerImage +from prefect.runner.storage import GitCredentials, GitRepository +from pydantic import AnyHttpUrl, Field, TypeAdapter + +import controlflow as cf + + +class HNArticleSummary(TypedDict): + link: AnyHttpUrl + title: str + main_topics: Annotated[set[str], Field(min_length=1, max_length=5)] + key_takeaways: Annotated[set[str], Field(min_length=1, max_length=5)] + tech_domains: Annotated[set[str], Field(min_length=1, max_length=5)] + + +@cf.task(instructions="concise, main details") +def analyze_article(id: str) -> HNArticleSummary: + """Analyze a HackerNews article and return structured insights""" + content = httpx.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json").json() + return f"here is the article content: {content}" # type: ignore + + +@flow(retries=2) +def analyze_hn_articles(n: int = 5) -> list[HNArticleSummary]: + top_article_ids = httpx.get( + "https://hacker-news.firebaseio.com/v0/topstories.json" + ).json()[:n] + briefs = analyze_article.map(top_article_ids).result() + create_markdown_artifact( + key="hn-article-exec-summary", + markdown=task(task_run_name=f"make summary of {len(briefs)} articles")(cf.run)( + objective="markdown summary of all extracted article briefs", + result_type=Annotated[str, Field(description="markdown summary")], + context=dict(briefs=briefs), + ), + description="executive summary of all extracted article briefs", + ) + return briefs + + +if __name__ == "__main__": + EVERY_12_HOURS_CRON = "0 */12 * * *" + if len(sys.argv) > 1 and sys.argv[1] == "serve": + analyze_hn_articles.serve( + parameters={"n": 5}, + cron=EVERY_12_HOURS_CRON, + ) + elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy": + analyze_hn_articles.from_source( + source=str((p := Path(__file__)).parent.resolve()), + entrypoint=f"{p.name}:analyze_hn_articles", + ).deploy( + name="local-deployment", + work_pool_name="local", + cron=EVERY_12_HOURS_CRON, + ) + elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy": + repo = GitRepository( + url="https://github.com/PrefectHQ/controlflow.git", + branch="main", + credentials=None, # replace with `dict(username="", access_token="")` for private repos + ) + analyze_hn_articles.from_source( + source=repo, + entrypoint="examples/read_hn.py:analyze_articles", + ).deploy( + name="docker-deployment", + # image=DockerImage( # uncomment and replace with your own image if desired + # name="zzstoatzz/cf-read-hn", + # tag="latest", + # dockerfile=str(Path(__file__).parent.resolve() / "read-hn.Dockerfile"), + # ), + work_pool_name="docker-work", # uv pip install -U prefect-docker prefect worker start --pool docker-work --type docker + cron=EVERY_12_HOURS_CRON, + parameters={"n": 5}, + job_variables={ + "env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")}, + "image": "zzstoatzz/cf-read-hn:latest", # publicly available image on dockerhub + }, + build=False, + push=False, + ) + else: + print(f"just running the code\n\n\n\n\n\n") + briefs = analyze_hn_articles(5) # type: ignore + TypeAdapter(list[HNArticleSummary]).validate_python(briefs) diff --git a/src/controlflow/decorators.py b/src/controlflow/decorators.py index 82df0e7..8cbf703 100644 --- a/src/controlflow/decorators.py +++ b/src/controlflow/decorators.py @@ -1,9 +1,12 @@ import asyncio import functools import inspect -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, Optional, TypeVar, Union, cast +from prefect import Flow as PrefectFlow +from prefect import Task as PrefectTask from prefect.utilities.asyncutils import run_coro_as_sync +from typing_extensions import ParamSpec import controlflow from controlflow.agents import Agent @@ -14,11 +17,14 @@ # from controlflow.utilities.marvin import patch_marvin +P = ParamSpec("P") +R = TypeVar("R") + logger = get_logger(__name__) def flow( - fn: Optional[Callable[..., Any]] = None, + fn: Optional[Callable[P, R]] = None, *, thread: Optional[str] = None, instructions: Optional[str] = None, @@ -29,8 +35,8 @@ def flow( timeout_seconds: Optional[Union[float, int]] = None, prefect_kwargs: Optional[dict[str, Any]] = None, context_kwargs: Optional[list[str]] = None, - **kwargs: Optional[dict[str, Any]], -): + **kwargs: Any, +) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]: """ A decorator that wraps a function as a ControlFlow flow. @@ -54,7 +60,7 @@ def flow( callable: The wrapped function or a new flow decorator if `fn` is not provided. """ if fn is None: - return functools.partial( + return functools.partial( # type: ignore flow, thread=thread, instructions=instructions, @@ -70,13 +76,15 @@ def flow( sig = inspect.signature(fn) def create_flow_context(bound_args): - flow_kwargs = kwargs.copy() + flow_kwargs: dict[str, Any] = kwargs.copy() if thread is not None: - flow_kwargs.setdefault("thread_id", thread) + flow_kwargs["thread_id"] = thread if tools is not None: - flow_kwargs.setdefault("tools", tools) + flow_kwargs["tools"] = tools if default_agent is not None: - flow_kwargs.setdefault("default_agent", default_agent) + flow_kwargs["default_agent"] = default_agent + + flow_kwargs.update(kwargs) context = {} if context_kwargs: @@ -92,7 +100,7 @@ def create_flow_context(bound_args): if asyncio.iscoroutinefunction(fn): @functools.wraps(fn) - async def wrapper(*wrapper_args, **wrapper_kwargs): + async def wrapper(*wrapper_args, **wrapper_kwargs): # type: ignore bound = sig.bind(*wrapper_args, **wrapper_kwargs) bound.apply_defaults() with ( @@ -112,17 +120,19 @@ def wrapper(*wrapper_args, **wrapper_kwargs): ): return fn(*wrapper_args, **wrapper_kwargs) - wrapper = prefect_flow( - timeout_seconds=timeout_seconds, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - **(prefect_kwargs or {}), - )(wrapper) - return wrapper + return cast( + Callable[[Callable[P, R]], PrefectFlow[P, R]], + prefect_flow( + timeout_seconds=timeout_seconds, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + **(prefect_kwargs or {}), + )(wrapper), + ) def task( - fn: Optional[Callable[..., Any]] = None, + fn: Optional[Callable[P, R]] = None, *, objective: Optional[str] = None, instructions: Optional[str] = None, @@ -133,8 +143,8 @@ def task( retries: Optional[int] = None, retry_delay_seconds: Optional[Union[float, int]] = None, timeout_seconds: Optional[Union[float, int]] = None, - **task_kwargs: Optional[dict[str, Any]], -): + **task_kwargs: Any, +) -> Callable[[Callable[P, R]], PrefectTask[P, R]]: """ A decorator that turns a Python function into a Task. The Task objective is set to the function name, and the instructions are set to the function @@ -157,78 +167,68 @@ def task( callable: The wrapped function or a new task decorator if `fn` is not provided. """ - if fn is None: - return functools.partial( - task, - objective=objective, - instructions=instructions, - name=name, - agents=agents, - tools=tools, - interactive=interactive, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - timeout_seconds=timeout_seconds, - **task_kwargs, - ) - - sig = inspect.signature(fn) - - if name is None: - name = fn.__name__ - - if objective is None: - objective = fn.__doc__ or "" + def decorator(func: Callable[P, R]) -> PrefectTask[P, R]: + sig = inspect.signature(func) - result_type = fn.__annotations__.get("return") - - def _get_task(*args, **kwargs) -> Task: - # first process callargs - bound = sig.bind(*args, **kwargs) - bound.apply_defaults() - context = bound.arguments.copy() - - # call the function to see if it produces an updated objective - maybe_coro = fn(*args, **kwargs) - if asyncio.iscoroutine(maybe_coro): - result = run_coro_as_sync(maybe_coro) + if name is None: + task_name = func.__name__ else: - result = maybe_coro - if result is not None: - context["Additional context"] = result + task_name = name - return Task( - objective=objective, - instructions=instructions, - name=name, - agents=agents, - context=context, - result_type=result_type, - interactive=interactive or False, - tools=tools or [], - **task_kwargs, - ) + if objective is None: + task_objective = func.__doc__ or "" + else: + task_objective = objective - if asyncio.iscoroutinefunction(fn): + result_type = func.__annotations__.get("return") - @functools.wraps(fn) - async def wrapper(*args, **kwargs): - task = _get_task(*args, **kwargs) - return await task.run_async() - else: + def _get_task(*args, **kwargs) -> Task: + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + context = bound.arguments.copy() + + maybe_coro = func(*args, **kwargs) + if asyncio.iscoroutine(maybe_coro): + result = run_coro_as_sync(maybe_coro) + else: + result = maybe_coro + if result is not None: + context["Additional context"] = result + + return Task( + objective=task_objective, + instructions=instructions, + name=task_name, + agents=agents, + context=context, + result_type=result_type, + interactive=interactive or False, + tools=tools or [], + **task_kwargs, + ) + + if asyncio.iscoroutinefunction(func): + + @functools.wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # type: ignore + task = _get_task(*args, **kwargs) + return await task.run_async() # type: ignore + else: - @functools.wraps(fn) - def wrapper(*args, **kwargs): - task = _get_task(*args, **kwargs) - return task.run() + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + task = _get_task(*args, **kwargs) + return task.run() # type: ignore - wrapper = prefect_task( - timeout_seconds=timeout_seconds, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - )(wrapper) + prefect_wrapper = prefect_task( + timeout_seconds=timeout_seconds, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + )(wrapper) - # store the `as_task` method for loading the task object - wrapper.as_task = _get_task + setattr(prefect_wrapper, "as_task", _get_task) + return cast(PrefectTask[P, R], prefect_wrapper) - return wrapper + if fn is None: + return decorator + return decorator(fn) # type: ignore diff --git a/src/controlflow/instructions.py b/src/controlflow/instructions.py index 55fbaf7..eb86327 100644 --- a/src/controlflow/instructions.py +++ b/src/controlflow/instructions.py @@ -1,5 +1,5 @@ from contextlib import contextmanager -from typing import Generator, List +from typing import Generator, List, Union from controlflow.utilities.context import ctx from controlflow.utilities.logging import get_logger @@ -8,7 +8,9 @@ @contextmanager -def instructions(instructions: str) -> Generator[list[str], None, None]: +def instructions( + instructions: Union[str, None], +) -> Generator[Union[list[str], None], None, None]: """ Temporarily add instructions to the current instruction stack. The instruction is removed when the context is exited.