From 68dd51820153d32830233abb554fac4f3b2f2d03 Mon Sep 17 00:00:00 2001 From: thegunner157 Date: Tue, 16 May 2023 09:21:34 +0200 Subject: [PATCH] Fix bf for composer in version 2 (#367) * Fix troubles with composer in version 2. Get airflow version and then distinguish code * Fix troubles with composer in version 2. Get airflow version and then distinguish code * Change approach to get airflow version * Change approach to get airflow version * Change approach to get airflow version * Change approach to get airflow version * Change approach to get airflow version * Fix troubles with airflow import based on airflow version and kubernetes connection * Document development process of bigflow * Fix dagbuilder tests * Fix test for dagbuilder * Change documentation * Fix troubles with composer in version 2. Get airflow version and then distinguish code * Fix troubles with composer in version 2. Get airflow version and then distinguish code * Change approach to get airflow version * Change approach to get airflow version * Change approach to get airflow version * Change approach to get airflow version * Change approach to get airflow version * Fix troubles with airflow import based on airflow version and kubernetes connection * Document development process of bigflow * Fix dagbuilder tests * Fix test for dagbuilder * Change documentation * Change documentation and modify comments --- bigflow/dagbuilder.py | 5 +++-- docs/deployment.md | 2 +- docs/development.md | 12 +++++++++++- test/dagbuilder/my_daily_workflow__dag.py.txt | 5 +++-- test/dagbuilder/my_parametrized_workflow__dag.py.txt | 5 +++-- test/dagbuilder/my_workflow__dag.py.txt | 9 +++++---- 6 files changed, 26 insertions(+), 12 deletions(-) diff --git a/bigflow/dagbuilder.py b/bigflow/dagbuilder.py index 5ab50618..b9417732 100644 --- a/bigflow/dagbuilder.py +++ b/bigflow/dagbuilder.py @@ -66,8 +66,9 @@ def generate_dag_file( from airflow.contrib.kubernetes.secret import Secret from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator - # BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x) + # To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0) IS_COMPOSER_2_X = version.version >= '2.0.0' + IS_AIRFLOW_2_3_X = version.version >= '2.3.0' namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default' default_args = dict( @@ -135,7 +136,7 @@ def build_dag_operator( 'secrets': {secrets_definition_list}, 'execution_timeout': {execution_timeout!r}, }} - if IS_COMPOSER_2_X: + if IS_AIRFLOW_2_3_X: {pod_operator_params_var}['config_file'] = "/home/airflow/composer_kube_config" {pod_operator_params_var}['kubernetes_conn_id'] = "kubernetes_default" diff --git a/docs/deployment.md b/docs/deployment.md index 4a682c29..7e2f5e48 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -71,7 +71,7 @@ a new Composer instance. Set only these properties (the others leave blank or de * **Machine type** — `n1-standard-2` or higher (we recommend using `n1-standard-2`), * **Disk size (GB)** — 50 is enough. -BigFlow generated DAGs are compatible with Composer 1.X + Airflow 1.X and Composer 2.X + Airflow 2.X. +BigFlow generated DAGs are compatible with Composer 1.X + Airflow 1.X and Composer >= 2.1.0 + Airflow 2.X. That's it, wait until the new Composer Instance is ready. It should look like this: diff --git a/docs/development.md b/docs/development.md index b0a3ae27..8d1d9cf0 100644 --- a/docs/development.md +++ b/docs/development.md @@ -58,7 +58,17 @@ pip-compile base_frozen.in ## Development process -TODO +At the beginning set up infrastructure for your test project. + +Then implement your changes for bigflow and push changes on your newly created specific branch. + +After that, on test project set in `requirements.in` bigflow version from your branch. +to do that you just need to set bigflow in `requirements.in` file. + +Then remove your `requirements.txt` file, +build dependencies once again (using `pip-compile` or `bf build-requirements` command), then uninstall bigflow locally +and run `pip install -r resources/requirements.txt`. After that, you can simply run `bf build` and `bf deploy` commands +and finally check created DAGs on Airflow website which you can access from composer who were initialized in infrastructure project. ## Backward compatibility diff --git a/test/dagbuilder/my_daily_workflow__dag.py.txt b/test/dagbuilder/my_daily_workflow__dag.py.txt index 3d2e0386..0aec640d 100644 --- a/test/dagbuilder/my_daily_workflow__dag.py.txt +++ b/test/dagbuilder/my_daily_workflow__dag.py.txt @@ -16,8 +16,9 @@ except ImportError: from airflow.contrib.kubernetes.secret import Secret from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator -# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x) +# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0) IS_COMPOSER_2_X = version.version >= '2.0.0' +IS_AIRFLOW_2_3_X = version.version >= '2.3.0' namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default' default_args = dict( @@ -55,7 +56,7 @@ tjob1_pod_operator_params = { 'secrets': [], 'execution_timeout': datetime.timedelta(seconds=10800), } -if IS_COMPOSER_2_X: +if IS_AIRFLOW_2_3_X: tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config" tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default" diff --git a/test/dagbuilder/my_parametrized_workflow__dag.py.txt b/test/dagbuilder/my_parametrized_workflow__dag.py.txt index d76963c3..070e9b6f 100644 --- a/test/dagbuilder/my_parametrized_workflow__dag.py.txt +++ b/test/dagbuilder/my_parametrized_workflow__dag.py.txt @@ -16,8 +16,9 @@ except ImportError: from airflow.contrib.kubernetes.secret import Secret from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator -# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x) +# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0) IS_COMPOSER_2_X = version.version >= '2.0.0' +IS_AIRFLOW_2_3_X = version.version >= '2.3.0' namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default' default_args = dict( @@ -58,7 +59,7 @@ tjob1_pod_operator_params = { ], 'execution_timeout': datetime.timedelta(seconds=10800), } -if IS_COMPOSER_2_X: +if IS_AIRFLOW_2_3_X: tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config" tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default" diff --git a/test/dagbuilder/my_workflow__dag.py.txt b/test/dagbuilder/my_workflow__dag.py.txt index b0c0663d..028c8aa8 100644 --- a/test/dagbuilder/my_workflow__dag.py.txt +++ b/test/dagbuilder/my_workflow__dag.py.txt @@ -16,8 +16,9 @@ except ImportError: from airflow.contrib.kubernetes.secret import Secret from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator -# BigFlow assumes that you use (airflow 1.x + composer 1.x) or (airflow 2.x + composer 2.x) +# To deploy BigFlow project, following requirements options are needed: (airflow 1.x + composer 1.x) or (airflow 2.x + composer >= 2.1.0) IS_COMPOSER_2_X = version.version >= '2.0.0' +IS_AIRFLOW_2_3_X = version.version >= '2.3.0' namespace = 'composer-user-workloads' if IS_COMPOSER_2_X else 'default' default_args = dict( @@ -55,7 +56,7 @@ tjob1_pod_operator_params = { 'secrets': [], 'execution_timeout': datetime.timedelta(seconds=10800), } -if IS_COMPOSER_2_X: +if IS_AIRFLOW_2_3_X: tjob1_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config" tjob1_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default" @@ -81,7 +82,7 @@ tjob2_pod_operator_params = { 'secrets': [], 'execution_timeout': datetime.timedelta(seconds=10800), } -if IS_COMPOSER_2_X: +if IS_AIRFLOW_2_3_X: tjob2_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config" tjob2_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default" @@ -108,7 +109,7 @@ tjob3_pod_operator_params = { 'secrets': [], 'execution_timeout': datetime.timedelta(seconds=10800), } -if IS_COMPOSER_2_X: +if IS_AIRFLOW_2_3_X: tjob3_pod_operator_params['config_file'] = "/home/airflow/composer_kube_config" tjob3_pod_operator_params['kubernetes_conn_id'] = "kubernetes_default"