From 329cf8e1d880142594caa255425cefce098c6d8b Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Tue, 12 Nov 2024 21:58:08 -0600 Subject: [PATCH] make friendly to any user better example updates fix kwargs --- examples/prefect_deploy.py | 53 ------ ...t-deploy.Dockerfile => read-hn.Dockerfile} | 1 - examples/read_hn.py | 99 +++++++++++ src/controlflow/decorators.py | 157 +++++++++--------- 4 files changed, 175 insertions(+), 135 deletions(-) delete mode 100644 examples/prefect_deploy.py rename examples/{prefect-deploy.Dockerfile => read-hn.Dockerfile} (85%) create mode 100644 examples/read_hn.py diff --git a/examples/prefect_deploy.py b/examples/prefect_deploy.py deleted file mode 100644 index b6c71c2..0000000 --- a/examples/prefect_deploy.py +++ /dev/null @@ -1,53 +0,0 @@ -import os -import sys -from pathlib import Path - -from prefect.docker import DockerImage -from prefect.runner.storage import GitRepository - -import controlflow as cf - - -@cf.task() -def write_poem(topic: str) -> str: - """Write four lines that rhyme""" - return f"The topic is {topic}" - - -@cf.flow() -def write_poems(topics: list[str]) -> list[str]: - return write_poem.map(topics).result() - - -if __name__ == "__main__": - if len(sys.argv) > 1 and sys.argv[1] == "serve": - write_poems.serve() - elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy": - write_poems.from_source( - source=str((p := Path(__file__)).parent.resolve()), - entrypoint=f"{p.name}:write_poem", - ).deploy(name="local-deployment", work_pool_name="local-process-pool") - elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy": - repo = GitRepository( - url="https://github.com/PrefectHQ/controlflow.git", - branch="example-deploy", - ) - write_poems.from_source( - source=repo, - entrypoint="examples/prefect_deploy.py:write_poems", - ).deploy( - name="docker-deployment", - image=DockerImage( - name="zzstoatzz/cf-test-deploy", - tag="latest", - dockerfile=str( - Path(__file__).parent.resolve() / "prefect-deploy.Dockerfile" - ), - ), - work_pool_name="docker-work", - parameters={"topics": ["roses", "violets", "sugar", "spice"]}, - job_variables={"env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")}}, - ) - else: - print(f"just running the code\n\n\n\n\n\n") - write_poems(["roses", "violets", "sugar", "spice"]) diff --git a/examples/prefect-deploy.Dockerfile b/examples/read-hn.Dockerfile similarity index 85% rename from examples/prefect-deploy.Dockerfile rename to examples/read-hn.Dockerfile index 55fb0fd..26a5c78 100644 --- a/examples/prefect-deploy.Dockerfile +++ b/examples/read-hn.Dockerfile @@ -7,7 +7,6 @@ RUN rm -rf /var/lib/apt/lists/* WORKDIR /app ENV UV_SYSTEM_PYTHON=1 -ENV PATH="/root/.local/bin:$PATH" RUN uv pip install controlflow diff --git a/examples/read_hn.py b/examples/read_hn.py new file mode 100644 index 0000000..e3dff0b --- /dev/null +++ b/examples/read_hn.py @@ -0,0 +1,99 @@ +# /// script +# dependencies = ["controlflow"] +# /// + +import os +import sys +from pathlib import Path +from typing import Annotated, TypedDict + +import httpx +from prefect.artifacts import create_markdown_artifact +from prefect.blocks.system import Secret +from prefect.docker import DockerImage +from prefect.runner.storage import GitCredentials, GitRepository +from pydantic import AnyHttpUrl, Field + +import controlflow as cf + + +class HNArticleSummary(TypedDict): + link: AnyHttpUrl + title: str + main_topics: Annotated[set[str], Field(min_length=1, max_length=5)] + key_takeaways: Annotated[set[str], Field(min_length=1, max_length=5)] + tech_domains: Annotated[set[str], Field(min_length=1, max_length=5)] + + +@cf.task(instructions="concise, main details") +def analyze_article(id: str) -> HNArticleSummary: + """Analyze a HackerNews article and return structured insights""" + content = httpx.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json").json() + return f"here is the article content: {content}" # type: ignore + + +@cf.task() +def summarize_article_briefs( + briefs: list[HNArticleSummary], +) -> Annotated[str, Field(description="markdown summary")]: + """Summarize a list of article briefs""" + return f"here are the article briefs: {briefs}" # type: ignore + + +@cf.flow(retries=2) +def analyze_hn_articles(n: int = 5): + top_article_ids = httpx.get( + "https://hacker-news.firebaseio.com/v0/topstories.json" + ).json()[:n] + briefs = analyze_article.map(top_article_ids).result() + create_markdown_artifact( + key="hn-article-exec-summary", + markdown=summarize_article_briefs(briefs), + ) + + +if __name__ == "__main__": + EVERY_12_HOURS_CRON = "0 */12 * * *" + if len(sys.argv) > 1 and sys.argv[1] == "serve": + analyze_hn_articles.serve( + parameters={"n": 5}, + cron=EVERY_12_HOURS_CRON, + ) + elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy": + analyze_hn_articles.from_source( + source=str((p := Path(__file__)).parent.resolve()), + entrypoint=f"{p.name}:analyze_hn_articles", + ).deploy( + name="local-deployment", + work_pool_name="local", + cron=EVERY_12_HOURS_CRON, + ) + elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy": + repo = GitRepository( + url="https://github.com/PrefectHQ/controlflow.git", + branch="main", + credentials=None, # replace with `dict(username="", access_token="")` for private repos + ) + analyze_hn_articles.from_source( + source=repo, + entrypoint="examples/read_hn.py:analyze_articles", + ).deploy( + name="docker-deployment", + # image=DockerImage( # uncomment and replace with your own image if desired + # name="zzstoatzz/cf-read-hn", + # tag="latest", + # dockerfile=str(Path(__file__).parent.resolve() / "read-hn.Dockerfile"), + # ), + work_pool_name="docker-work", # uv pip install -U prefect-docker prefect worker start --pool docker-work --type docker + cron=EVERY_12_HOURS_CRON, + parameters={"n": 5}, + job_variables={ + "env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")}, + "image": "zzstoatzz/cf-read-hn:latest", # publicly available image on dockerhub + }, + build=False, + push=False, + ) + else: + print(f"just running the code\n\n\n\n\n\n") + analyze_hn_articles(5) diff --git a/src/controlflow/decorators.py b/src/controlflow/decorators.py index 92f06ef..8cbf703 100644 --- a/src/controlflow/decorators.py +++ b/src/controlflow/decorators.py @@ -4,6 +4,7 @@ from typing import Any, Callable, Optional, TypeVar, Union, cast from prefect import Flow as PrefectFlow +from prefect import Task as PrefectTask from prefect.utilities.asyncutils import run_coro_as_sync from typing_extensions import ParamSpec @@ -34,7 +35,7 @@ def flow( timeout_seconds: Optional[Union[float, int]] = None, prefect_kwargs: Optional[dict[str, Any]] = None, context_kwargs: Optional[list[str]] = None, - **kwargs: Optional[dict[str, Any]], + **kwargs: Any, ) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]: """ A decorator that wraps a function as a ControlFlow flow. @@ -75,13 +76,15 @@ def flow( sig = inspect.signature(fn) def create_flow_context(bound_args): - flow_kwargs = kwargs.copy() + flow_kwargs: dict[str, Any] = kwargs.copy() if thread is not None: - flow_kwargs.setdefault("thread_id", thread) # type: ignore + flow_kwargs["thread_id"] = thread if tools is not None: - flow_kwargs.setdefault("tools", tools) # type: ignore + flow_kwargs["tools"] = tools if default_agent is not None: - flow_kwargs.setdefault("default_agent", default_agent) # type: ignore + flow_kwargs["default_agent"] = default_agent + + flow_kwargs.update(kwargs) context = {} if context_kwargs: @@ -117,17 +120,19 @@ def wrapper(*wrapper_args, **wrapper_kwargs): ): return fn(*wrapper_args, **wrapper_kwargs) - prefect_wrapper = prefect_flow( - timeout_seconds=timeout_seconds, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - **(prefect_kwargs or {}), - )(wrapper) - return cast(Callable[[Callable[P, R]], PrefectFlow[P, R]], prefect_wrapper) + return cast( + Callable[[Callable[P, R]], PrefectFlow[P, R]], + prefect_flow( + timeout_seconds=timeout_seconds, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + **(prefect_kwargs or {}), + )(wrapper), + ) def task( - fn: Optional[Callable[..., Any]] = None, + fn: Optional[Callable[P, R]] = None, *, objective: Optional[str] = None, instructions: Optional[str] = None, @@ -138,8 +143,8 @@ def task( retries: Optional[int] = None, retry_delay_seconds: Optional[Union[float, int]] = None, timeout_seconds: Optional[Union[float, int]] = None, - **task_kwargs: Optional[dict[str, Any]], -): + **task_kwargs: Any, +) -> Callable[[Callable[P, R]], PrefectTask[P, R]]: """ A decorator that turns a Python function into a Task. The Task objective is set to the function name, and the instructions are set to the function @@ -162,78 +167,68 @@ def task( callable: The wrapped function or a new task decorator if `fn` is not provided. """ - if fn is None: - return functools.partial( - task, - objective=objective, - instructions=instructions, - name=name, - agents=agents, - tools=tools, - interactive=interactive, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - timeout_seconds=timeout_seconds, - **task_kwargs, - ) - - sig = inspect.signature(fn) - - if name is None: - name = fn.__name__ - - if objective is None: - objective = fn.__doc__ or "" + def decorator(func: Callable[P, R]) -> PrefectTask[P, R]: + sig = inspect.signature(func) - result_type = fn.__annotations__.get("return") - - def _get_task(*args, **kwargs) -> Task: - # first process callargs - bound = sig.bind(*args, **kwargs) - bound.apply_defaults() - context = bound.arguments.copy() - - # call the function to see if it produces an updated objective - maybe_coro = fn(*args, **kwargs) - if asyncio.iscoroutine(maybe_coro): - result = run_coro_as_sync(maybe_coro) + if name is None: + task_name = func.__name__ else: - result = maybe_coro - if result is not None: - context["Additional context"] = result + task_name = name - return Task( - objective=objective, - instructions=instructions, - name=name, - agents=agents, - context=context, - result_type=result_type, - interactive=interactive or False, - tools=tools or [], - **task_kwargs, - ) + if objective is None: + task_objective = func.__doc__ or "" + else: + task_objective = objective - if asyncio.iscoroutinefunction(fn): + result_type = func.__annotations__.get("return") - @functools.wraps(fn) - async def wrapper(*args, **kwargs): - task = _get_task(*args, **kwargs) - return await task.run_async() - else: + def _get_task(*args, **kwargs) -> Task: + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + context = bound.arguments.copy() + + maybe_coro = func(*args, **kwargs) + if asyncio.iscoroutine(maybe_coro): + result = run_coro_as_sync(maybe_coro) + else: + result = maybe_coro + if result is not None: + context["Additional context"] = result + + return Task( + objective=task_objective, + instructions=instructions, + name=task_name, + agents=agents, + context=context, + result_type=result_type, + interactive=interactive or False, + tools=tools or [], + **task_kwargs, + ) + + if asyncio.iscoroutinefunction(func): + + @functools.wraps(func) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # type: ignore + task = _get_task(*args, **kwargs) + return await task.run_async() # type: ignore + else: - @functools.wraps(fn) - def wrapper(*args, **kwargs): - task = _get_task(*args, **kwargs) - return task.run() + @functools.wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: + task = _get_task(*args, **kwargs) + return task.run() # type: ignore - prefect_wrapper = prefect_task( - timeout_seconds=timeout_seconds, - retries=retries, - retry_delay_seconds=retry_delay_seconds, - )(wrapper) + prefect_wrapper = prefect_task( + timeout_seconds=timeout_seconds, + retries=retries, + retry_delay_seconds=retry_delay_seconds, + )(wrapper) - # store the `as_task` method for loading the task object - prefect_wrapper.as_task = _get_task + setattr(prefect_wrapper, "as_task", _get_task) + return cast(PrefectTask[P, R], prefect_wrapper) - return cast(Callable[[Callable[..., Any]], Task], prefect_wrapper) + if fn is None: + return decorator + return decorator(fn) # type: ignore