Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt perf tuning #581

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,18 @@ terraform-docs .
```

This will replace the readme file at `deploy/infrastructure/README.md` and `deploy/meltano/README.md` with any changes made to the module and header docs.

## Developing transforms

To develop transforms, you'll need to duplicate `.env.template` as `.env` and ensure that at least these env vars are declared:

- `SNOWFLAKE_USER`
- `SNOWFLAKE_PASSWORD`

## Incremental build with `--defer` option

```console
meltano invoke dbt-snowflake:seed
meltano invoke dbt-snowflake:snapshot
meltano invoke dbt-snowflake:build
```
5 changes: 2 additions & 3 deletions data/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ PERMISSION_BOT_PASSWORD="****"
PERMISSION_BOT_ACCOUNT="****"

# Snowflake MELTANO User Password
SNOWFLAKE_USER="...."
SNOWFLAKE_PASSWORD="****"

# dbt Snowflake Password
DBT_SNOWFLAKE_PASSWORD="****"
SNOWFLAKE_ROLE="DEVELOPER"

# Slack Webhooks
TARGET_APPRISE_SINGER_ACTIVITY_URIS=["https://hooks.slack.com/services/{}/{}/{}"]
Expand Down
4 changes: 3 additions & 1 deletion data/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ profile = meltano
tab_space_size = 4
max_line_length = 80
indent_unit = space
comma_style = trailing

[sqlfluff:layout:type:comma]
line_position = trailing

[sqlfluff:rules:L010] # Keywords
capitalisation_policy = upper
Expand Down
42 changes: 42 additions & 0 deletions data/environments/dbt_dev.meltano.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
environments:
- name: dbt_dev
annotations:
docs:
description: >
Environment for developing dbt transforms separately from EL.
config:
plugins:
utilities:
- name: dbt-snowflake
config:
user: ${SNOWFLAKE_USER}
role: ${SNOWFLAKE_USER}
warehouse: CORE
skip_pre_invoke: true
database: USERDEV_PROD
database_prep: USERDEV_PREP
target_schema_prefix: ${SNOWFLAKE_USER}_
- name: sqlfluff
config:
user: ${SNOWFLAKE_USER}
- name: great_expectations
config:
prod_database: USERDEV_PROD
raw_database: USERDEV_RAW
username: ${SNOWFLAKE_USER}
role: ${SNOWFLAKE_USER}
warehouse: CORE
env:
USER_PREFIX: ${SNOWFLAKE_USER}
SUPERSET_API_URL: http://localhost:8088
SUPERSET_USER: admin
SUPERSET_PASS: admin
# https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html
AIRFLOW__CORE__PLUGINS_FOLDER: $MELTANO_PROJECT_ROOT/orchestrate/plugins_local
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: '30'
AIRFLOW_VAR_MELTANO_ENVIRONMENT: userdev
AIRFLOW_VAR_OPERATOR_TYPE: bash
# Secrets via KMS
KMS_PUBLIC_KEY_PATH: utilities/kms/Publickey.pem
KMS_DOTENV_PATH: .env
KMS_SECRETS_PATH: secrets.yml
8 changes: 4 additions & 4 deletions data/environments/userdev.meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ environments:
- name: tap-snowflake
config:
dbname: USERDEV_PROD
user: ${USER_PREFIX}
role: ${USER_PREFIX}
user: ${SNOWFLAKE_USER}
role: ${SNOWFLAKE_USER}
warehouse: CORE
- name: tap-snowflake-metrics-legacy
config:
Expand Down Expand Up @@ -85,7 +85,7 @@ environments:
role: ${USER_PREFIX}
warehouse: CORE
env:
USER_PREFIX: PNADOLNY
USER_PREFIX: ${SNOWFLAKE_USER}
SUPERSET_API_URL: http://localhost:8088
SUPERSET_USER: admin
SUPERSET_PASS: admin
Expand All @@ -97,4 +97,4 @@ environments:
# Secrets via KMS
KMS_PUBLIC_KEY_PATH: utilities/kms/Publickey.pem
KMS_DOTENV_PATH: .env
KMS_SECTRETS_PATH: secrets.yml
KMS_SECRETS_PATH: secrets.yml
4 changes: 3 additions & 1 deletion data/meltano.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
version: 1
default_environment: userdev
default_environment: dbt_dev
send_anonymous_usage_stats: false
project_id: c15e971a-d318-4a9d-979b-1039ce5fd1b1
include_paths:
Expand All @@ -9,3 +9,5 @@ include_paths:
- ./orchestrate/*.meltano.yml
- ./transform/*.meltano.yml
- ./utilities/*.meltano.yml
ff:
strict_env_var_mode: true
2 changes: 1 addition & 1 deletion data/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
meltano==2.10.0
meltano==2.16.1
werkzeug>=2.1,<=2.1.3
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{{
config(
materialized='incremental'
materialized='incremental',
cluster_by=['event_id']
)
}}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
{{
config(
materialized='table'
)
}}

WITH base AS (

SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
{{
config(
materialized='table'
)
}}

WITH reparse_1 AS (
-- Incident: new schema not registered to SnowcatCloud versions 2.14.0 and
-- 2.15.0 (partial)
Expand Down
210 changes: 12 additions & 198 deletions data/transform/models/staging/snowplow/stg_snowplow__events.sql
Original file line number Diff line number Diff line change
@@ -1,212 +1,26 @@
-- TODO: Debug performance. As of Feb 28, build time is approximately 40 minutes
{{
config(
materialized='incremental'
materialized='table'
)
}}
WITH blended_source AS (

SELECT
*,
FALSE AS snowplow_bad_parsed
{% if env_var("MELTANO_ENVIRONMENT") == "cicd" %}

FROM raw.snowplow.events
WHERE derived_tstamp::TIMESTAMP >= DATEADD('day', -3, CURRENT_DATE)
{% else %}

FROM {{ source('snowplow', 'events') }}

{% if is_incremental() %}

WHERE UPLOADED_AT >= (SELECT max(UPLOADED_AT) FROM {{ this }} WHERE snowplow_bad_parsed = FALSE)

{% endif %}
{% endif %}

UNION ALL

SELECT
*,
TRUE AS snowplow_bad_parsed
FROM {{ ref('snowplow_bad_parsed') }}
{% if is_incremental() %}

WHERE event_id NOT IN (SELECT DISTINCT event_id FROM {{ this }} WHERE snowplow_bad_parsed = TRUE)

{% endif %}

),

source AS (
WITH source AS (

SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY
event_id
ORDER BY derived_tstamp::TIMESTAMP DESC
) AS row_num
FROM blended_source

),

clean_new_source AS (

SELECT *
FROM source
WHERE row_num = 1
{% if is_incremental() %}

AND event_id NOT IN (
SELECT DISTINCT event_id FROM {{ this }}
)
{% endif %}

),

renamed AS (

SELECT -- noqa: L034
-- only meltano events. For the first ~6 months no app_id was
-- sent from Meltano. So nulls are from meltano.
COALESCE(app_id, 'meltano') AS app_id,
platform,
etl_tstamp::TIMESTAMP AS etl_enriched_at,
collector_tstamp::TIMESTAMP AS collector_received_at,
dvce_created_tstamp::TIMESTAMP AS device_created_at,
dvce_sent_tstamp::TIMESTAMP AS device_sent_at,
refr_dvce_tstamp::TIMESTAMP AS refr_device_processed_at,
derived_tstamp::TIMESTAMP AS event_created_at,
derived_tstamp::DATE AS event_created_date,
true_tstamp::TIMESTAMP AS true_sent_at,
uploaded_at,
event,
event_id,
txn_id,
name_tracker,
v_tracker,
v_collector,
v_etl,
user_id,
user_ipaddress,
user_fingerprint,
domain_userid,
domain_sessionidx,
network_userid,
geo_country,
geo_region,
geo_city,
geo_zipcode,
geo_latitude,
geo_longitude,
geo_region_name,
ip_isp,
ip_organization,
ip_domain,
ip_netspeed,
page_url,
page_title,
page_referrer,
page_urlscheme,
page_urlhost,
page_urlport,
page_urlpath,
page_urlquery,
page_urlfragment,
refr_urlscheme,
refr_urlhost,
refr_urlport,
refr_urlpath,
refr_urlquery,
refr_urlfragment,
refr_medium,
refr_source,
refr_term,
mkt_medium,
mkt_source,
mkt_term,
mkt_content,
mkt_campaign,
contexts,
se_category,
se_action,
se_label,
se_property,
se_value,
unstruct_event,
tr_orderid,
tr_affiliation,
tr_total,
tr_tax,
tr_shipping,
tr_city,
tr_state,
tr_country,
ti_orderid,
ti_sku,
ti_name,
ti_category,
ti_price,
ti_quantity,
pp_xoffset_min,
pp_xoffset_max,
pp_yoffset_min,
pp_yoffset_max,
useragent,
br_name,
br_family,
br_version,
br_type,
br_renderengine,
br_lang,
br_features_pdf,
br_features_flash,
br_features_java,
br_features_director,
br_features_quicktime,
br_features_realplayer,
br_features_windowsmedia,
br_features_gears,
br_features_silverlight,
br_cookies,
br_colordepth,
br_viewwidth,
br_viewheight,
os_name,
os_family,
os_manufacturer,
os_timezone,
dvce_type,
dvce_ismobile,
dvce_screenwidth,
dvce_screenheight,
doc_charset,
doc_width,
doc_height,
tr_currency,
tr_total_base,
tr_tax_base,
tr_shipping_base,
ti_currency,
ti_price_base,
base_currency,
geo_timezone,
mkt_clickid,
mkt_network,
etl_tags,
refr_domain_userid,
derived_contexts::VARIANT AS derived_contexts,
domain_sessionid,
event_vendor,
event_name,
event_format,
event_version,
event_fingerprint,
MD5(user_ipaddress) AS ip_address_hash,
snowplow_bad_parsed
FROM clean_new_source

ORDER BY event_created_at::TIMESTAMP DESC
) AS dedupe_rank
FROM {{ ref('stg_snowplow__events_union_all') }}
-- {% if is_incremental() %}
-- -- TODO: Is this safe or would we lose records?:
-- WHERE uploaded_at >= (SELECT max(UPLOADED_AT) FROM {{ this }})
-- {% endif %}
)

SELECT *
FROM renamed
FROM source
WHERE dedupe_rank = 1
Loading