Skip to content

Commit

Permalink
make friendly to any user
Browse files Browse the repository at this point in the history
better example

updates

fix kwargs
  • Loading branch information
zzstoatzz committed Nov 13, 2024
1 parent 85fa4fd commit 329cf8e
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 135 deletions.
53 changes: 0 additions & 53 deletions examples/prefect_deploy.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ RUN rm -rf /var/lib/apt/lists/*
WORKDIR /app

ENV UV_SYSTEM_PYTHON=1
ENV PATH="/root/.local/bin:$PATH"

RUN uv pip install controlflow

99 changes: 99 additions & 0 deletions examples/read_hn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# /// script
# dependencies = ["controlflow"]
# ///

import os
import sys
from pathlib import Path
from typing import Annotated, TypedDict

import httpx
from prefect.artifacts import create_markdown_artifact
from prefect.blocks.system import Secret
from prefect.docker import DockerImage
from prefect.runner.storage import GitCredentials, GitRepository
from pydantic import AnyHttpUrl, Field

import controlflow as cf


class HNArticleSummary(TypedDict):
link: AnyHttpUrl
title: str
main_topics: Annotated[set[str], Field(min_length=1, max_length=5)]
key_takeaways: Annotated[set[str], Field(min_length=1, max_length=5)]
tech_domains: Annotated[set[str], Field(min_length=1, max_length=5)]


@cf.task(instructions="concise, main details")
def analyze_article(id: str) -> HNArticleSummary:
"""Analyze a HackerNews article and return structured insights"""
content = httpx.get(f"https://hacker-news.firebaseio.com/v0/item/{id}.json").json()
return f"here is the article content: {content}" # type: ignore


@cf.task()
def summarize_article_briefs(
briefs: list[HNArticleSummary],
) -> Annotated[str, Field(description="markdown summary")]:
"""Summarize a list of article briefs"""
return f"here are the article briefs: {briefs}" # type: ignore


@cf.flow(retries=2)
def analyze_hn_articles(n: int = 5):
top_article_ids = httpx.get(
"https://hacker-news.firebaseio.com/v0/topstories.json"
).json()[:n]
briefs = analyze_article.map(top_article_ids).result()
create_markdown_artifact(
key="hn-article-exec-summary",
markdown=summarize_article_briefs(briefs),
)


if __name__ == "__main__":
EVERY_12_HOURS_CRON = "0 */12 * * *"
if len(sys.argv) > 1 and sys.argv[1] == "serve":
analyze_hn_articles.serve(
parameters={"n": 5},
cron=EVERY_12_HOURS_CRON,
)
elif len(sys.argv) > 1 and sys.argv[1] == "local_deploy":
analyze_hn_articles.from_source(
source=str((p := Path(__file__)).parent.resolve()),
entrypoint=f"{p.name}:analyze_hn_articles",
).deploy(
name="local-deployment",
work_pool_name="local",
cron=EVERY_12_HOURS_CRON,
)
elif len(sys.argv) > 1 and sys.argv[1] == "docker_deploy":
repo = GitRepository(
url="https://github.com/PrefectHQ/controlflow.git",
branch="main",
credentials=None, # replace with `dict(username="", access_token="")` for private repos
)
analyze_hn_articles.from_source(
source=repo,
entrypoint="examples/read_hn.py:analyze_articles",
).deploy(
name="docker-deployment",
# image=DockerImage( # uncomment and replace with your own image if desired
# name="zzstoatzz/cf-read-hn",
# tag="latest",
# dockerfile=str(Path(__file__).parent.resolve() / "read-hn.Dockerfile"),
# ),
work_pool_name="docker-work", # uv pip install -U prefect-docker prefect worker start --pool docker-work --type docker
cron=EVERY_12_HOURS_CRON,
parameters={"n": 5},
job_variables={
"env": {"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY")},
"image": "zzstoatzz/cf-read-hn:latest", # publicly available image on dockerhub
},
build=False,
push=False,
)
else:
print(f"just running the code\n\n\n\n\n\n")
analyze_hn_articles(5)
157 changes: 76 additions & 81 deletions src/controlflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Callable, Optional, TypeVar, Union, cast

from prefect import Flow as PrefectFlow
from prefect import Task as PrefectTask
from prefect.utilities.asyncutils import run_coro_as_sync
from typing_extensions import ParamSpec

Expand Down Expand Up @@ -34,7 +35,7 @@ def flow(
timeout_seconds: Optional[Union[float, int]] = None,
prefect_kwargs: Optional[dict[str, Any]] = None,
context_kwargs: Optional[list[str]] = None,
**kwargs: Optional[dict[str, Any]],
**kwargs: Any,
) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]:
"""
A decorator that wraps a function as a ControlFlow flow.
Expand Down Expand Up @@ -75,13 +76,15 @@ def flow(
sig = inspect.signature(fn)

def create_flow_context(bound_args):
flow_kwargs = kwargs.copy()
flow_kwargs: dict[str, Any] = kwargs.copy()
if thread is not None:
flow_kwargs.setdefault("thread_id", thread) # type: ignore
flow_kwargs["thread_id"] = thread
if tools is not None:
flow_kwargs.setdefault("tools", tools) # type: ignore
flow_kwargs["tools"] = tools
if default_agent is not None:
flow_kwargs.setdefault("default_agent", default_agent) # type: ignore
flow_kwargs["default_agent"] = default_agent

flow_kwargs.update(kwargs)

context = {}
if context_kwargs:
Expand Down Expand Up @@ -117,17 +120,19 @@ def wrapper(*wrapper_args, **wrapper_kwargs):
):
return fn(*wrapper_args, **wrapper_kwargs)

prefect_wrapper = prefect_flow(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
**(prefect_kwargs or {}),
)(wrapper)
return cast(Callable[[Callable[P, R]], PrefectFlow[P, R]], prefect_wrapper)
return cast(
Callable[[Callable[P, R]], PrefectFlow[P, R]],
prefect_flow(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
**(prefect_kwargs or {}),
)(wrapper),
)


def task(
fn: Optional[Callable[..., Any]] = None,
fn: Optional[Callable[P, R]] = None,
*,
objective: Optional[str] = None,
instructions: Optional[str] = None,
Expand All @@ -138,8 +143,8 @@ def task(
retries: Optional[int] = None,
retry_delay_seconds: Optional[Union[float, int]] = None,
timeout_seconds: Optional[Union[float, int]] = None,
**task_kwargs: Optional[dict[str, Any]],
):
**task_kwargs: Any,
) -> Callable[[Callable[P, R]], PrefectTask[P, R]]:
"""
A decorator that turns a Python function into a Task. The Task objective is
set to the function name, and the instructions are set to the function
Expand All @@ -162,78 +167,68 @@ def task(
callable: The wrapped function or a new task decorator if `fn` is not provided.
"""

if fn is None:
return functools.partial(
task,
objective=objective,
instructions=instructions,
name=name,
agents=agents,
tools=tools,
interactive=interactive,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
timeout_seconds=timeout_seconds,
**task_kwargs,
)

sig = inspect.signature(fn)

if name is None:
name = fn.__name__

if objective is None:
objective = fn.__doc__ or ""
def decorator(func: Callable[P, R]) -> PrefectTask[P, R]:
sig = inspect.signature(func)

result_type = fn.__annotations__.get("return")

def _get_task(*args, **kwargs) -> Task:
# first process callargs
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
context = bound.arguments.copy()

# call the function to see if it produces an updated objective
maybe_coro = fn(*args, **kwargs)
if asyncio.iscoroutine(maybe_coro):
result = run_coro_as_sync(maybe_coro)
if name is None:
task_name = func.__name__
else:
result = maybe_coro
if result is not None:
context["Additional context"] = result
task_name = name

return Task(
objective=objective,
instructions=instructions,
name=name,
agents=agents,
context=context,
result_type=result_type,
interactive=interactive or False,
tools=tools or [],
**task_kwargs,
)
if objective is None:
task_objective = func.__doc__ or ""
else:
task_objective = objective

if asyncio.iscoroutinefunction(fn):
result_type = func.__annotations__.get("return")

@functools.wraps(fn)
async def wrapper(*args, **kwargs):
task = _get_task(*args, **kwargs)
return await task.run_async()
else:
def _get_task(*args, **kwargs) -> Task:
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
context = bound.arguments.copy()

maybe_coro = func(*args, **kwargs)
if asyncio.iscoroutine(maybe_coro):
result = run_coro_as_sync(maybe_coro)
else:
result = maybe_coro
if result is not None:
context["Additional context"] = result

return Task(
objective=task_objective,
instructions=instructions,
name=task_name,
agents=agents,
context=context,
result_type=result_type,
interactive=interactive or False,
tools=tools or [],
**task_kwargs,
)

if asyncio.iscoroutinefunction(func):

@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: # type: ignore
task = _get_task(*args, **kwargs)
return await task.run_async() # type: ignore
else:

@functools.wraps(fn)
def wrapper(*args, **kwargs):
task = _get_task(*args, **kwargs)
return task.run()
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
task = _get_task(*args, **kwargs)
return task.run() # type: ignore

prefect_wrapper = prefect_task(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)(wrapper)
prefect_wrapper = prefect_task(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)(wrapper)

# store the `as_task` method for loading the task object
prefect_wrapper.as_task = _get_task
setattr(prefect_wrapper, "as_task", _get_task)
return cast(PrefectTask[P, R], prefect_wrapper)

return cast(Callable[[Callable[..., Any]], Task], prefect_wrapper)
if fn is None:
return decorator
return decorator(fn) # type: ignore

0 comments on commit 329cf8e

Please sign in to comment.