Skip to content

Fix bf for composer in version 2 #367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a36548b
Fix troubles with composer in version 2. Get airflow version and then…
thegunner157 Apr 18, 2023
9e07ab6
Fix troubles with composer in version 2. Get airflow version and then…
thegunner157 Apr 19, 2023
65a4130
Change approach to get airflow version
thegunner157 Apr 19, 2023
e3925c5
Change approach to get airflow version
thegunner157 Apr 21, 2023
3ba711b
Change approach to get airflow version
thegunner157 Apr 24, 2023
d985051
Change approach to get airflow version
thegunner157 Apr 24, 2023
6195eb6
Change approach to get airflow version
thegunner157 Apr 25, 2023
8189fbb
Fix troubles with airflow import based on airflow version and kuberne…
thegunner157 Apr 26, 2023
b71cbc0
Document development process of bigflow
thegunner157 Apr 26, 2023
de32fc8
Fix dagbuilder tests
thegunner157 Apr 27, 2023
df1f47b
Merge branch 'master' into fix_bf_for_composer_in_version_2
thegunner157 May 5, 2023
f99fe15
Fix test for dagbuilder
thegunner157 May 5, 2023
7356c34
Change documentation
thegunner157 May 5, 2023
6d6d50c
Fix troubles with composer in version 2. Get airflow version and then…
thegunner157 Apr 18, 2023
4d51977
Fix troubles with composer in version 2. Get airflow version and then…
thegunner157 Apr 19, 2023
1ed8f60
Change approach to get airflow version
thegunner157 Apr 19, 2023
9cca8de
Change approach to get airflow version
thegunner157 Apr 21, 2023
05a3d22
Change approach to get airflow version
thegunner157 Apr 24, 2023
6dec278
Change approach to get airflow version
thegunner157 Apr 24, 2023
f0ebdc6
Change approach to get airflow version
thegunner157 Apr 25, 2023
124ecec
Fix troubles with airflow import based on airflow version and kuberne…
thegunner157 Apr 26, 2023
bce94fc
Document development process of bigflow
thegunner157 Apr 26, 2023
35cf52f
Fix dagbuilder tests
thegunner157 Apr 27, 2023
731a894
Fix test for dagbuilder
thegunner157 May 5, 2023
ed18252
Change documentation
thegunner157 May 5, 2023
846139b
Change documentation and modify comments
thegunner157 May 15, 2023
09f1c09
Merge remote-tracking branch 'origin/fix_bf_for_composer_in_version_2…
thegunner157 May 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bigflow/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ def generate_dag_file(
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'

default_args = dict(
Expand Down Expand Up @@ -135,7 +136,7 @@ def build_dag_operator(
'secrets': {secrets_definition_list},
'execution_timeout': {execution_timeout!r},
}}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
{pod_operator_params_var}['config_file'] = "/home/airflow/composer_kube_config"
{pod_operator_params_var}['kubernetes_conn_id'] = "kubernetes_default"

Expand Down
2 changes: 1 addition & 1 deletion docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ a new Composer instance. Set only these properties (the others leave blank or de
* **Machine type** — `n1-standard-2` or higher (we recommend using `n1-standard-2`),
* **Disk size (GB)** — 50 is enough.

BigFlow generated DAGs are compatible with Composer 1.X + Airflow 1.X and Composer 2.X + Airflow 2.X.
BigFlow generated DAGs are compatible with Composer 1.X + Airflow 1.X and Composer >= 2.1.0 + Airflow 2.X.

That's it, wait until the new Composer Instance is ready.
It should look like this:
Expand Down
12 changes: 11 additions & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ pip-compile base_frozen.in

## Development process

TODO
At the beginning set up infrastructure for your test project.

Then implement your changes for bigflow and push changes on your newly created specific branch.

After that, on test project set in `requirements.in` bigflow version from your branch.
to do that you just need to set bigflow in `requirements.in` file.

Then remove your `requirements.txt` file,
build dependencies once again (using `pip-compile` or `bf build-requirements` command), then uninstall bigflow locally
and run `pip install -r resources/requirements.txt`. After that, you can simply run `bf build` and `bf deploy` commands
and finally check created DAGs on Airflow website which you can access from composer who were initialized in infrastructure project.

## Backward compatibility

Expand Down
5 changes: 3 additions & 2 deletions test/dagbuilder/my_daily_workflow__dag.py.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ except ImportError:
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'

default_args = dict(
Expand Down Expand Up @@ -55,7 +56,7 @@ tjob1_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand Down
5 changes: 3 additions & 2 deletions test/dagbuilder/my_parametrized_workflow__dag.py.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ except ImportError:
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'

default_args = dict(
Expand Down Expand Up @@ -58,7 +59,7 @@ tjob1_pod_operator_params = {
],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand Down
9 changes: 5 additions & 4 deletions test/dagbuilder/my_workflow__dag.py.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ except ImportError:
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'

default_args = dict(
Expand Down Expand Up @@ -55,7 +56,7 @@ tjob1_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand All @@ -81,7 +82,7 @@ tjob2_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob2_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob2_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand All @@ -108,7 +109,7 @@ tjob3_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob3_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob3_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand Down