Example repository showing how to leverage Prefect, dbt and Snowflake together to build a data platform. This project includes automation to make it easy to use this repositor as a template to get started, and adjust it based on the needs of your team.
How to Build a Modular Data Stack — Data Platform with Prefect, dbt and Snowflake
The following flows are parametrized and allow you to easily trigger any dbt command (both locally and from the Prefect UI):
dbt debug
= test connection and profiledbt compile
= compiletarget/manifest.json
filedbt deps
= install dependenciesdbt seed
= ingest raw data from seed filesdbt run
= run all modelsdbt run --select xyz
= run selected model(s)dbt test
= run testsdbt build
= run seeds, models, tests, snapshots, in a DAG orderdbt build --select result:error+ --defer --state ./target
- use the--select
flag together with--defer
and--state
to manually rerun your dbt DAG from a failed nodedbt run --select failed_model+
- if you know which model failed and you want to directly trigger a run from that failed model and all downstream dependencies
The Prefect task prefect_dbt.cli.commands.trigger_dbt_cli_command
requires that the dbt project_dir
is located either in the root project directory, or in a path that you explicitly provide.
To avoid any issues with paths, it's best to run the flow as a script, rather than from iPython. Examples:
python flows/transformation/attribution/dbt_build.py
python flows/transformation/attribution/dbt_run_from_manifest.py
python flows/transformation/jaffle_shop/dbt_build.py
python flows/transformation/jaffle_shop/dbt_cloned_repo.py
python flows/transformation/jaffle_shop/dbt_run_from_manifest.py
You can create deployments from the automated scripts.
Note that you can add tags and schedule directly from the UI, so we deliberately skipped that in the CLI commands.
python deploy_locally.py
python deploy_locally_docker.py
python deploy_locally_k8s.py
python deploy_docker_s3.py
python deploy_k8s_s3.py
The entire repository is built so that you can reuse the same code for development and production environment. No environment information is hardcoded here:
- all blocks and work-queues are by default created with the name
default
so that differentiating between dev and prod environments is as easy as pointing to thedevelopment
orproduction
Prefect Cloud workspace - if you are on the OSS version, you can accomplish the same by pointing to a different
PREFECT_API_URL
of yourdev
vs.prod
Orion API server instances (the assumption is that those are separate servers) - for both Prefect Cloud and self-hosted Orion, all credentials can be securely stored using blocks to avoid reliance on environment variables baked into your execution environment, or hardcoded configuration values.
🧠 "Switching between
dev
,staging
andprod
environments, various infrastructures, storage, and cloud data warehouses was never easier. CI/CD became almost enjoyable!" - Marvin
To switch the above setup from S3 to GCS or Azure Blob Storage, it's a matter of pointing to a different storage block. All blocks are defined in utilities/create_blocks.py.
To add schedule to deployments, you can do it using the command prefect deployment set-schedule
and by referencing the deployment in the format flow_name/deployment_name
:
prefect deployment set-schedule jaffle-shop-ingest-transform/default --cron "0 * * * *"
prefect deployment set-schedule attribution-ingest-transform/default --cron "0 * * * *"
# For more options, check:
prefect deployment set-schedule --help
You can also attach schedules directly to the prefect deployment build
if you prefer to do it that way. You can add schedule using either --cron
, --interval
or --rrule
flags:
prefect deployment build -q default -n default -a flows/ingestion/ingest_jaffle_shop.py:raw_data_jaffle_shop --interval 3600
# For more options, check:
prefect deployment build --help
💡 The easiest way to attach a schedule to a deployment is to do it directly from the UI. Go to the deployment UI page and add schedule from there.
The following documentation page provides more information about that.
Use the command prefect deployment run
and reference your deployment in the format flow_name/deployment_name
:
prefect deployment run jaffle-shop-ingest-transform/default
prefect deployment run attribution-ingest-transform/default
# Start an agent to create runs from deployments
prefect agent start -q default
The last line starts an agent polling from work-queue default
. You don't have to create a work queue -- Prefect will create it automatically if it doesn't exist yet.
Commands to run all flows from a deployment via CLI are available in utilities/run_deployments_from_cli.bash.
I got a RuntimeError
: fatal: Invalid --project-dir flag. Not a dbt project. Missing dbt_project.yml file
This means that either:
- the
project_dir
doesn't correctly point to your dbt project -- make sure that you provide correct path (relative or absolute), alternatively, put the dbt project into your project's root directory -- this way, thetrigger_dbt_cli_command
task will be able to find it directly - when your agent deploys flow to some custom infrastructure, it copies the files from your remote storage into a /tmp directory for the flow run execution; it might be that some relative path to your dbt project directory doesn't work the same way when running within the agent's environment -- you can avoid that by either having your dbt project in the Prefect's project root directory, or by using absolute paths.
You can leverage Prefect CLI profiles to easily switch between environments.
Go to the UI and create a workspace if you don't have one already. Then create an API key and use that to fill the information below.
prefect profile create dev
prefect cloud workspace set --workspace "your_prefect_cloud_account/dev"
prefect config set PREFECT_API_KEY=pnu_topsecret123456789
Now do the same for production.
prefect profile create prod
prefect cloud workspace set --workspace "your_prefect_cloud_account/prod"
prefect config set PREFECT_API_KEY=this_must_be_a_different_api_key_as_they_are_bound_to_workspace
- no hardcoding of credentials or environment information: when you are in the
dev
workspace, everything works the same way as inprod
but based on yourdev
credentials; you can move to production with no modifications to your code simply by switching to aprod
workspace - leveraging blocks to make it easy to change variables (such as dbt project path, number of retries, even emojis for your dbt tasks),
- easily rotate credentials,
- track metadata and capabilities of each block,
- reuse of components across teams while still being able to easily take the same block logic but apply it in a different setting simply by creating another block of the same type and pointing to that new block name in your workflow logic (see the distinction between dbt block for jaffle_shop and attribution in this file)
- switching between execution in a local process, execution in a docker container, or execution in a Kubernetes cluster, is as simple as switching the infrastructure block in your deployment CLI command
- switching from local execution on your machine to
dev
andproduction
environment is as simple as pointing to a different workspace and CLI profile - simple dbt flows are parametrized to make it easy to execute any dbt command for the relevant project
- all ingestion flows are parametrized to make backfills a breeze - just change the start and end date, and the interval in your parameter values when triggering the run, and you can trigger a backfill from the same flow as usual -- see the flow parameters e.g. in the flow flows/ingestion/ingest_jaffle_shop.py
- optionally, you can track each dbt model or test as a separate task by using the
dbt_run_from_manifest.py
logic -- dataplatform/blocks/dbt.py - it's fun to use! if you want to change how your dbt tasks look like for Halloween or Christmas, change the emoji on the block 🤗