Apache Airflow is a platform to programmatically author, schedule and monitor workflows.
This repository codifies the Airflow cluster that is deployed at workflow.telemetry.mozilla.org (behind SSO) and commonly referred to as "WTMO" or simply "Airflow".
Some links relevant to users and developers of WTMO:
- The dagsdirectory in this repository contains some custom DAG definitions
- Many of the DAGs registered with WTMO don't live in this repository, but are instead generated from ETL task definitions in bigquery-etl
- The Data SRE team maintains a WTMO Developer Guide (behind SSO)
Forking workflow is required ONLY for contributors without write access.
This repo enforces Conventional Commit style via the Github action Semantic PRs.
See the Airflow's Best Practices guide to help you write DAGs.
⚠ Warning: Do not import resources from the dags directory in DAGs definition files ⚠
As an example, if you have dags/dag_a.py and dags/dag_b.py and want to use a helper
function in both DAG definition files, define the helper function in the utils directory
such as:
utils/helper.py
def helper_function():
    return "Help"dags/dag_a.py
from airflow import DAG
from utils.helper import helper_function
with DAG("dag_a", ...):
    ...dags/dag_b.py
from airflow import DAG
from utils.helper import helper_function
with DAG("dag_b", ...):
    ...WTMO deployments use git-sync sidecars to synchronize DAG files from multiple repositories via telemetry-airflow-dags using git submodules. Git-sync sidecar pattern results in the following directory structure once deployed.
airflow
├─ dags
│  └── repo
│      └── telemetry-airflow-dags
│          ├── <submodule repo_a>
│          │    └── dags
│          │        └── <dag files>
│          ├── <submodule repo_b>
│          │    └── dags
│          │        └── <dag files>
│          └── <submodule repo_c>
│               └── dags
│                   └── <dag files>
├─ utils
│  └── ...
└─ plugins
   └── ...
Hence, defining helper_function() in dags/dag_a.py and
importing the function in dags/dag_b.py as from dags.dag_a import helper_function
will not work after deployment because of the directory structured required for
git-sync sidecars.
This app is built and deployed with
docker and
docker-compose.
Dependencies are managed with
pip-tools pip-compile.
You'll also need to install PostgreSQL to build the database container.
⚠ Make sure you use the right Python version. Refer to Dockerfile for current supported Python Version ⚠
You can install the project dependencies locally to run tests with Pytest. We use the official Airflow constraints file to simplify Airflow dependency management. Install dependencies locally using the following command:
make pip-install-localAdd new Python dependencies into requirements.in or requirements-dev.in then execute the following commands:
make pip-compile
make pip-install-localBuild Airflow image with
make buildAssuming you're using Docker for Docker Desktop for macOS, start the docker service, click the docker icon in the menu bar, click on preferences and change the available memory to 4GB.
To deploy the Airflow container on the docker engine, with its required dependencies, run:
make build
make upTasks often require credentials to access external credentials. For example, one may choose to store API keys in an Airflow connection or variable. These variables are sure to exist in production but are often not mirrored locally for logistical reasons. Providing a dummy variable is the preferred way to keep the local development environment up to date.
Update the resources/dev_variables.env and resources/dev_connections.env with appropriate strings to
prevent broken workflows.
You can now connect to your local Airflow web console at
http://localhost:8080/.
All DAGs are paused by default for local instances and our staging instance of Airflow. In order to submit a DAG via the UI, you'll need to toggle the DAG from "Off" to "On". You'll likely want to toggle the DAG back to "Off" as soon as your desired task starts running.
See https://mozilla-hub.atlassian.net/wiki/spaces/SRE/pages/27922811/WTMO+Developer+Guide for more details.
make build && make up
make gke
When done:
make clean-gke
From there, connect to Airflow and enable your job.
Dataproc jobs run on a self-contained Dataproc cluster, created by Airflow.
To test these, jobs, you'll need a sandbox account and corresponding service account. For information on creating that, see "Testing GKE Jobs". Your service account will need Dataproc and GCS permissions (and BigQuery, if you're connecting to it). Note: Dataproc requires "Dataproc/Dataproc Worker" as well as Compute Admin permissions. You'll need to ensure that the Dataproc API is enabled in your sandbox project.
Ensure that your dataproc job has a configurable project to write to.
Set the project in the DAG entry to be configured based on development environment;
see the ltv.py job for an example of that.
From there, run the following:
make build && make up
./bin/add_gcp_creds $GOOGLE_APPLICATION_CREDENTIALS google_cloud_airflow_dataprocYou can then connect to Airflow locally. Enable your DAG and see that it runs correctly.
Some useful docker tricks for development and debugging:
make clean
# Remove any leftover docker volumes:
docker volume rm $(docker volume ls -qf dangling=true)
# Purge docker volumes (helps with postgres container failing to start)
# Careful, as this will purge all local volumes not used by at least one container.
docker volume pruneThis repository was structured to be deployed using the offical Airflow Helm Chart. See the Production Guide for best practices.
Production and stage deployments are automatically triggered for every merge to the main branch:
- A new Docker image is built from the mainbranch merge commit, tagged asmain-suffixed with the commit's short SHA, and then deployed to production and stage via ArgoCD.
Dev deployments are automatically triggered for every new Git tag that starts with dev-:
- A new Docker image is built from the tagged commit, tagged to match the Git dev-...tag, and then deployed to dev via ArgoCD.
- To create a Git tag for your current commit and push it you can run git tag <tagname>followed bygit push <remote> <tagname>.
- You should generally use a unique dev-...tag every time, because the deployments are triggered when ArgoCD detects a different latestdev-...image tag than what it last deployed.
Implementation details: