-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmulti-tasking.qmd
116 lines (84 loc) · 4.56 KB
/
multi-tasking.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
---
title: Multi-tasking
number-depth: 2
---
Sometimes orchestration involves repetition. For example, you might have a DAG with 3 different tasks that fetch stock data for 3 different stock symbols. To create this DAG, you'd likely use a `for` loop. You can achieve this `for` loop style task generation within gusty using some special frontmatter blocks:
- `multi_task_spec` - For iterating over arguments to be passed to an operator.
- `python_callable_partials` - For iterating over arguments to be passed to the function assigned to a `python_callable` argument.
In both `multi_task_spec` and `python_callable_partials`, the keys below each block will be the task id for a given task, and the arguments below each task id will be passed to that operator or callable, respectively.
We'll look at examples of each below.
## `multi_task_spec`
Imagine we want three `BashOperator` tasks to echo "hi", "hey", and "hello world".
We can define all three tasks in a single Task Definition File, which we'll call `multi_greeing.yml`. The name of the Task Definition File is arbitrary. Here are its contents:
```yaml
operator: airflow.operators.bash.BashOperator
bash_command: echo $GREETING
multi_task_spec:
say_hi:
env:
GREETING: hi
say_hey:
env:
GREETING: hey
say_hello_world:
env:
GREETING: hello
bash_command: echo $GREETING world
```
The above Task Definition File will create three tasks: `say_hi`, `say_hey`, and `say_hello_world`. The tasks `say_hi` and `say_hey` both inherit the same `bash_command`, but have different `env` arguments. The `say_hello_world` task also contains its own `env` argument, but goes a step further as to define its own `bash_command`.
This `multi_task_spec` produces the following graph:
![](media/images/multi-tasking/multi-hello.png){fig-align="center"}
This powerful syntax allows you to keep your task definitions [DRY](https://en.wikipedia.org/wiki/Don%27t_repeat_yourself). In this example, every task has a dedicated, unique `env` argument. In the case of `say_hi` and `say_hey`, they share a common `bash_command`. In the case of `say_hello_world`, it gets its own `env` and `bash_command`. Very flexible!
## `python_callable_partials`
Similar to `multi_task_spec`, `python_callable_partials` allows you to generate multiple tasks in a single file, except instead of passing arguments to an operator, you pass arguments directly to a `python_callable`.
In the example Task Definition File below, we'll create a few tasks to fetch the past year's stock data from [yfinance](https://pypi.org/project/yfinance/) for three different stock symbols: `AMZN`, `GOOG`, and `MSFT`.
```python
# ---
# python_callable: main
# python_callable_partials:
# get_amzn:
# symbol: AMZN
# get_goog:
# symbol: GOOG
# get_msft:
# symbol: MSFT
# ---
def main(symbol):
from yfinance import Ticker
stock = Ticker(symbol)
history = stock.history(period='1y', interval='1d').reset_index()
history["Symbol"] = symbol
print(history.head())
```
The above Task Definition File will create three tasks: `get_amzn`, `get_goog`, and `get_msft`. Each task will have its respective `symbol` passed to the the `main` function.
![](media/images/multi-tasking/multi-stock.png){fig-align="center"}
## Mixing It Up
`multi_task_spec` and `python_callable_partials` are non-exclusive, so you can mix and match configuration as needed.
Let's build upon our `yfinance` example, and instead of using the default `PythonOperator`, let's use the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/python/index.html#airflow.operators.python.PythonVirtualenvOperator), and change the requirements for our `get_amzn` task.
```python
# ---
# operator: airflow.operators.python.PythonVirtualenvOperator
# python_callable: main
# python_callable_partials:
# get_amzn:
# symbol: AMZN
# get_goog:
# symbol: GOOG
# get_msft:
# symbol: MSFT
# multi_task_spec:
# get_amzn:
# requirements:
# - yfinance==0.1.96
# ---
def main(symbol):
from yfinance import Ticker
stock = Ticker(symbol)
history = stock.history(period='1y', interval='1d').reset_index()
history["Symbol"] = symbol
print(history.head())
```
In the above example, we changed two things:
- We are now explicitly using the `PythonVirtualenvOperator` via the `operator` entry.
- Our `get_amzn` task also gets an entry in the `multi_task_spec` block, specifying a list of `requirements` just for our `get_amzn` task.
With both `multi_task_spec` and `python_callable_partials` working together, you can pretty much iterate over anything!