Skip to content

Commit

Permalink
Fix bf for composer in version 2 (#367)
Browse files Browse the repository at this point in the history
* Fix troubles with composer in version 2. Get airflow version and then distinguish code

* Fix troubles with composer in version 2. Get airflow version and then distinguish code

* Change approach to get airflow version

* Change approach to get airflow version

* Change approach to get airflow version

* Change approach to get airflow version

* Change approach to get airflow version

* Fix troubles with airflow import based on airflow version and kubernetes connection

* Document development process of bigflow

* Fix dagbuilder tests

* Fix test for dagbuilder

* Change documentation

* Fix troubles with composer in version 2. Get airflow version and then distinguish code

* Fix troubles with composer in version 2. Get airflow version and then distinguish code

* Change approach to get airflow version

* Change approach to get airflow version

* Change approach to get airflow version

* Change approach to get airflow version

* Change approach to get airflow version

* Fix troubles with airflow import based on airflow version and kubernetes connection

* Document development process of bigflow

* Fix dagbuilder tests

* Fix test for dagbuilder

* Change documentation

* Change documentation and modify comments
  • Loading branch information
thegunner157 authored May 16, 2023
1 parent 01b653f commit 68dd518
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 12 deletions.
5 changes: 3 additions & 2 deletions bigflow/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ def generate_dag_file(
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'
default_args = dict(
Expand Down Expand Up @@ -135,7 +136,7 @@ def build_dag_operator(
'secrets': {secrets_definition_list},
'execution_timeout': {execution_timeout!r},
}}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
{pod_operator_params_var}['config_file'] = "/home/airflow/composer_kube_config"
{pod_operator_params_var}['kubernetes_conn_id'] = "kubernetes_default"
Expand Down
2 changes: 1 addition & 1 deletion docs/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ a new Composer instance. Set only these properties (the others leave blank or de
* **Machine type** — `n1-standard-2` or higher (we recommend using `n1-standard-2`),
* **Disk size (GB)** — 50 is enough.

BigFlow generated DAGs are compatible with Composer 1.X + Airflow 1.X and Composer 2.X + Airflow 2.X.
BigFlow generated DAGs are compatible with Composer 1.X + Airflow 1.X and Composer >= 2.1.0 + Airflow 2.X.

That's it, wait until the new Composer Instance is ready.
It should look like this:
Expand Down
12 changes: 11 additions & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ pip-compile base_frozen.in

## Development process

TODO
At the beginning set up infrastructure for your test project.

Then implement your changes for bigflow and push changes on your newly created specific branch.

After that, on test project set in `requirements.in` bigflow version from your branch.
to do that you just need to set bigflow in `requirements.in` file.

Then remove your `requirements.txt` file,
build dependencies once again (using `pip-compile` or `bf build-requirements` command), then uninstall bigflow locally
and run `pip install -r resources/requirements.txt`. After that, you can simply run `bf build` and `bf deploy` commands
and finally check created DAGs on Airflow website which you can access from composer who were initialized in infrastructure project.

## Backward compatibility

Expand Down
5 changes: 3 additions & 2 deletions test/dagbuilder/my_daily_workflow__dag.py.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ except ImportError:
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'

default_args = dict(
Expand Down Expand Up @@ -55,7 +56,7 @@ tjob1_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand Down
5 changes: 3 additions & 2 deletions test/dagbuilder/my_parametrized_workflow__dag.py.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ except ImportError:
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'

default_args = dict(
Expand Down Expand Up @@ -58,7 +59,7 @@ tjob1_pod_operator_params = {
],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand Down
9 changes: 5 additions & 4 deletions test/dagbuilder/my_workflow__dag.py.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ except ImportError:
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x)
# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0)
IS_COMPOSER_2_X = version.version >= '2.0.0'
IS_AIRFLOW_2_3_X = version.version >= '2.3.0'
namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default'

default_args = dict(
Expand Down Expand Up @@ -55,7 +56,7 @@ tjob1_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand All @@ -81,7 +82,7 @@ tjob2_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob2_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob2_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand All @@ -108,7 +109,7 @@ tjob3_pod_operator_params = {
'secrets': [],
'execution_timeout': datetime.timedelta(seconds=10800),
}
if IS_COMPOSER_2_X:
if IS_AIRFLOW_2_3_X:
tjob3_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config"
tjob3_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"

Expand Down

0 comments on commit 68dd518

Please sign in to comment.