Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into revert-5972-terryhung/fix-…
Browse files Browse the repository at this point in the history
…the-config-error

Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>
  • Loading branch information
eapolinario committed Dec 11, 2024
2 parents faf4d1d + ab4290e commit 5b18d35
Show file tree
Hide file tree
Showing 3 changed files with 3,730 additions and 3,996 deletions.
99 changes: 39 additions & 60 deletions docs/user_guide/advanced_composition/eager_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
```

```{important}
This feature is experimental and the API is subject to breaking changes.
This feature is in beta and the API is still subject to minor changes.
If you encounter any issues please consider submitting a
[bug report](https://github.com/flyteorg/flyte/issues/new?assignees=&labels=bug%2Cuntriaged&projects=&template=bug_report.yaml&title=%5BBUG%5D+).
```
Expand Down Expand Up @@ -57,8 +57,7 @@ tasks, static subworkflows, and even other eager subworkflows in an _eager_
fashion such that we can materialize their outputs and use them inside the
parent eager workflow itself.

In the `simple_eager_workflow` function, we can see that we're `await`ing
the output of the `add_one` task and assigning it to the `out` variable. If
In the `simple_eager_workflow` function, we can call the `add_one` task and assigning it to the `out` variable. If
`out` is a negative integer, the workflow will return `-1`. Otherwise, it
will double the output of `add_one` and return it.

Expand All @@ -67,12 +66,17 @@ the Python integer that is the result of `x + 1` and not a promise.

## How it works

When you decorate a function with `@eager`, any function invoked within it
that's decorated with `@task`, `@workflow`, or `@eager` becomes
an [awaitable](https://docs.python.org/3/library/asyncio-task.html#awaitables)
object within the lifetime of the parent eager workflow execution. Note that
this happens automatically and you don't need to use the `async` keyword when
defining a task or workflow that you want to invoke within an eager workflow.
### Parallels to Python `asyncio`
The eager paradigm was written around Python's native `async` functionality. As such, it follows the same rules and
constructs and if you're used to working with async functions, you should be able to apply the exact same understanding to work with eager tasks.

In the example above, the tasks `add_one` and `double` are normal Flyte tasks and the functions being decorated are normal Python functions. This means that in the execution of the async task `simple_eager_workflow` will block on each of these functions just like Python would if these were simply just Python functions.

If you want to run functions in parallel, you will need to use `async` marked tasks, just like you would in normal Python.

Note that `eager` tasks share the same limitation as Python async functions. You can only call an `async` function inside another `async` function, or within a special handler (like `asyncio.run`). This means that until the `@workflow` decorator supports async workflow function definitions, which is doesn't today, you will not be able to call eager tasks or other async Python function tasks, inside workflows. This functionality is slated to be added in future releases. For the time being, you will need to run the tasks directly, either from FlyteRemote or the Flyte UI.

Unlike Python async however, when an `eager` task runs `async` sub-tasks in a real backend execution (not a local execution), it is doing real, wall-clock time parallelism, not just concurrency (assuming your K8s cluster is appropriately sized).

```{important}
With eager workflows, you basically have access to the Python `asyncio`
Expand All @@ -85,10 +89,10 @@ We're leveraging Python's native `async` capabilities in order to:
1. Materialize the output of flyte tasks and subworkflows so you can operate
on them without spinning up another pod and also determine the shape of the
workflow graph in an extremely flexible manner.
2. Provide an alternative way of achieving concurrency in Flyte. Flyte has
2. Provide an alternative way of achieving wall-time parallelism in Flyte. Flyte has
concurrency built into it, so all tasks/subworkflows will execute concurrently
assuming that they don't have any dependencies on each other. However, eager
workflows provide a python-native way of doing this, with the main downside
assuming that they don't have any dependencies on each other. However, `eager`
tasks provide a Python-native way of doing this, with the main downside
being that you lose the benefits of statically compiled workflows such as
compile-time analysis and first-class data lineage tracking.
```
Expand Down Expand Up @@ -142,7 +146,7 @@ to check if `out` is negative, but we're also using the `gt_100` task in the

### Loops

You can also gather the outputs of multiple tasks or subworkflows into a list:
You can also gather the outputs of multiple tasks or subworkflows into a list. Keep in mind that in this case, you will need to use an `async` function task, since normal tasks will block.

```{literalinclude} /examples/advanced_composition/advanced_composition/eager_workflows.py
:caption: advanced_composition/eager_workflows.py
Expand Down Expand Up @@ -206,60 +210,38 @@ developing your workflows and tasks.

(eager_workflows_remote)=

### Remote Flyte cluster execution
### Setting up remote Flyte cluster access

Under the hood, `@eager` workflows use the {py:class}`~flytekit.remote.remote.FlyteRemote`
object to kick off task, static workflow, and eager workflow executions.

In order to actually execute them on a Flyte cluster, you'll need to configure
eager workflows with a `FlyteRemote` object and secrets configuration that
allows you to authenticate into the cluster via a client secret key:

```{code-block} python
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config
@eager(
remote=FlyteRemote(
config=Config.auto(config_file="config.yaml"),
default_project="flytesnacks",
default_domain="development",
),
client_secret_group="<my_client_secret_group>",
client_secret_key="<my_client_secret_key>",
)
async def eager_workflow_remote(x: int) -> int:
...
object to kick off task, static workflow, and eager workflow executions. In order to create a `FlyteRemote` instance, `Config.auto()` is run and the resulting config object is passed into `FlyteRemote`.

This means that you just need to ensure eager workflow pods are run with the environment variables that including any mounted secrets for a client secret. For instance, the following three are sufficient in the most basic setup.

```{code-block} bash
FLYTE_PLATFORM_URL
FLYTE_CREDENTIALS_CLIENT_ID
FLYTE_CREDENTIALS_CLIENT_SECRET
```

Where `config.yaml` contains a
[flytectl](https://docs.flyte.org/projects/flytectl/en/latest/#configuration)-compatible
config file and `my_client_secret_group` and `my_client_secret_key` are the
{ref}`secret group and key <secrets>` that you've configured for your Flyte
cluster to authenticate via a client key.
See the relevant authentication docs for creating client credentials if using Flyte's internal authorization server.

### Sandbox Flyte cluster execution

When using a sandbox cluster started with `flytectl demo start`, however, the
`client_secret_group` and `client_secret_key` are not required, since the
default sandbox configuration does not require key-based authentication.

```{literalinclude} /examples/advanced_composition/advanced_composition/eager_workflows.py
:caption: advanced_composition/eager_workflows.py
:lines: 130-145
```
When using a sandbox cluster started with `flytectl demo start` no authentication is needed and eager workflows should
just work out of the box.

```{important}
When executing eager workflows on a remote Flyte cluster, it will execute the
latest version of tasks, static workflows, and eager workflows that are on
the `default_project` and `default_domain` as specified in the `FlyteRemote`
object. This means that you need to pre-register all Flyte entities that are
invoked inside of the eager workflow.
same version of tasks, static workflows, and eager workflows that are on
the `project` and `domain` as the eager task itself. If an entity is not found, FlyteRemote will attempt to register
it. Please be aware of this and potential naming errors due to difference in folder paths when running in the container.
Future work may be done to allow execution in another project/domain, but reference entities should always be
correctly reflected and invoked.
```

### Registering and running

Assuming that your `flytekit` code is configured correctly, you will need to
Assuming that your `flytekit` code is configured correctly, you should
register all of the task and subworkflows that are used with your eager
workflow with `pyflyte register`:

Expand Down Expand Up @@ -290,7 +272,7 @@ invoked inside of it.

## Eager workflows on Flyte console

Since eager workflows are an experimental feature, there is currently no
Since eager workflows are still in beta, there is currently no
first-class representation of them on Flyte Console, the UI for Flyte.
When you register an eager workflow, you'll be able to see it in the task view:

Expand Down Expand Up @@ -321,23 +303,20 @@ eager workflow:

## Limitations

As this feature is still experimental, there are a few limitations that you
As this feature is still in beta, there are a few limitations that you
need to keep in mind:

- You cannot invoke {ref}`dynamic workflows <dynamic_workflow>`,
{ref}`map tasks <map_task>`, or {ref}`launch plans <launch_plan>` inside an
eager workflow.
- [Context managers](https://docs.python.org/3/library/contextlib.html) will
only work on locally executed functions within the eager workflow, i.e. using a
context manager to modify the behavior of a task or subworkflow will not work
because they are executed on a completely different pod.
- All exceptions raised by Flyte tasks or workflows will be caught and raised
as an {py:class}`~flytekit.experimental.EagerException` at runtime.
as an {py:class}`~flytekit.exceptions.eager.EagerException` at runtime.
- All task/subworkflow outputs are materialized as Python values, which includes
offloaded types like `FlyteFile`, `FlyteDirectory`, `StructuredDataset`, and
`pandas.DataFrame` will be fully downloaded into the pod running the eager workflow.
This prevents you from incrementally downloading or streaming very large datasets
in eager workflows.
in eager workflows. (Please reach out to the team if you are interested in improving this.)
- Flyte entities that are invoked inside of an eager workflow must be registered
under the same project and domain as the eager workflow itself. The eager
workflow will execute the latest version of these entities.
Expand Down
Loading

0 comments on commit 5b18d35

Please sign in to comment.