Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deploy example and fix typing #382

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions examples/deploy.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'something' here should probably be 'write_poems'.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, an example that deploys it by pulling an image, would solve all of my problems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch @alkiviadis-savvopoulos!

fixed that and also added a docker example

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import sys
from pathlib import Path

import controlflow as cf


@cf.task()
def write_poem(topic: str) -> str:
"""Write four lines that rhyme"""
return f"The topic is {topic}"


@cf.flow()
def write_poems(topics: list[str]) -> list[str]:
return write_poem.map(topics).result()


if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "serve":
write_poems.serve()
elif len(sys.argv) > 1 and sys.argv[1] == "deploy":
write_poems.from_source(
source=str((p := Path(__file__)).parent.resolve()),
entrypoint=f"{p.name}:something",
).deploy(name="some-deployment", work_pool_name="local-process-pool")
else:
write_poems(["roses", "violets", "sugar", "spice"])
30 changes: 17 additions & 13 deletions src/controlflow/decorators.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import functools
import inspect
from typing import Any, Callable, Optional, Union
from typing import Any, Callable, Optional, ParamSpec, TypeVar, Union, cast

from prefect import Flow as PrefectFlow
from prefect.utilities.asyncutils import run_coro_as_sync

import controlflow
Expand All @@ -14,11 +15,14 @@

# from controlflow.utilities.marvin import patch_marvin

P = ParamSpec("P")
R = TypeVar("R")

logger = get_logger(__name__)


def flow(
fn: Optional[Callable[..., Any]] = None,
fn: Optional[Callable[P, R]] = None,
*,
thread: Optional[str] = None,
instructions: Optional[str] = None,
Expand All @@ -30,7 +34,7 @@ def flow(
prefect_kwargs: Optional[dict[str, Any]] = None,
context_kwargs: Optional[list[str]] = None,
**kwargs: Optional[dict[str, Any]],
):
) -> Callable[[Callable[P, R]], PrefectFlow[P, R]]:
"""
A decorator that wraps a function as a ControlFlow flow.

Expand All @@ -54,7 +58,7 @@ def flow(
callable: The wrapped function or a new flow decorator if `fn` is not provided.
"""
if fn is None:
return functools.partial(
return functools.partial( # type: ignore
flow,
thread=thread,
instructions=instructions,
Expand All @@ -72,11 +76,11 @@ def flow(
def create_flow_context(bound_args):
flow_kwargs = kwargs.copy()
if thread is not None:
flow_kwargs.setdefault("thread_id", thread)
flow_kwargs.setdefault("thread_id", thread) # type: ignore
if tools is not None:
flow_kwargs.setdefault("tools", tools)
flow_kwargs.setdefault("tools", tools) # type: ignore
if default_agent is not None:
flow_kwargs.setdefault("default_agent", default_agent)
flow_kwargs.setdefault("default_agent", default_agent) # type: ignore

context = {}
if context_kwargs:
Expand All @@ -92,7 +96,7 @@ def create_flow_context(bound_args):
if asyncio.iscoroutinefunction(fn):

@functools.wraps(fn)
async def wrapper(*wrapper_args, **wrapper_kwargs):
async def wrapper(*wrapper_args, **wrapper_kwargs): # type: ignore
bound = sig.bind(*wrapper_args, **wrapper_kwargs)
bound.apply_defaults()
with (
Expand All @@ -112,13 +116,13 @@ def wrapper(*wrapper_args, **wrapper_kwargs):
):
return fn(*wrapper_args, **wrapper_kwargs)

wrapper = prefect_flow(
prefect_wrapper = prefect_flow(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
**(prefect_kwargs or {}),
)(wrapper)
return wrapper
return cast(Callable[[Callable[P, R]], PrefectFlow[P, R]], prefect_wrapper)


def task(
Expand Down Expand Up @@ -222,13 +226,13 @@ def wrapper(*args, **kwargs):
task = _get_task(*args, **kwargs)
return task.run()

wrapper = prefect_task(
prefect_wrapper = prefect_task(
timeout_seconds=timeout_seconds,
retries=retries,
retry_delay_seconds=retry_delay_seconds,
)(wrapper)

# store the `as_task` method for loading the task object
wrapper.as_task = _get_task
prefect_wrapper.as_task = _get_task

return wrapper
return cast(Callable[[Callable[..., Any]], Task], prefect_wrapper)
6 changes: 4 additions & 2 deletions src/controlflow/instructions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from contextlib import contextmanager
from typing import Generator, List
from typing import Generator, List, Union

from controlflow.utilities.context import ctx
from controlflow.utilities.logging import get_logger
Expand All @@ -8,7 +8,9 @@


@contextmanager
def instructions(instructions: str) -> Generator[list[str], None, None]:
def instructions(
instructions: Union[str, None],
) -> Generator[Union[list[str], None], None, None]:
"""
Temporarily add instructions to the current instruction stack. The
instruction is removed when the context is exited.
Expand Down
Loading