Skip to content

Commit

Permalink
Merge pull request #302 from ScalefreeCOM/298-bug-redshift-not-using-…
Browse files Browse the repository at this point in the history
…qualify-for-latest-records

Redshift: Use QUALIFY statement instead of prep-CTEs to enhance performance
  • Loading branch information
tkiehn authored Jan 10, 2025
2 parents e397ce4 + 261d7ab commit 27f64ae
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 153 deletions.
45 changes: 11 additions & 34 deletions macros/tables/redshift/eff_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,13 @@ source_data AS (
In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite.
#}
{%- if is_incremental() %}
current_status_prep AS (

SELECT
{{ tracked_hashkey }},
{{ is_active_alias}},
ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn
FROM {{ this }}

),

current_status AS (

SELECT
{{ tracked_hashkey }},
{{ is_active_alias }}
FROM current_status_prep
WHERE rn = 1
FROM {{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1

),
{% endif %}
Expand Down Expand Up @@ -136,32 +126,19 @@ current_status AS (

{#
The rows are deduplicated on the is_active_alias, to only include status changes.
Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status.
#}
deduplicated_incoming_prep AS (

SELECT
is_active.{{ tracked_hashkey }},
is_active.{{ src_ldts }},
is_active.{{ is_active_alias }},
LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active

FROM is_active

),

deduplicated_incoming AS (

SELECT
deduplicated_incoming_prep.{{ tracked_hashkey }},
deduplicated_incoming_prep.{{ src_ldts }},
deduplicated_incoming_prep.{{ is_active_alias }}

FROM
deduplicated_incoming_prep
WHERE
deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active
OR deduplicated_incoming_prep.lag_is_active IS NULL
ia.{{ tracked_hashkey }},
ia.{{ src_ldts }},
ia.{{ is_active_alias }}
FROM is_active ia
QUALIFY
CASE
WHEN ia.{{ is_active_alias }} = LAG(ia.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END

),

Expand Down
16 changes: 6 additions & 10 deletions macros/tables/redshift/hub.sql
Original file line number Diff line number Diff line change
Expand Up @@ -207,21 +207,17 @@ source_new_union AS (

{%- endif %}

earliest_hk_over_all_sources_prep AS (
SELECT
lcte.*,
ROW_NUMBER() OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts
}}) as rn
FROM {{ ns.last_cte }} AS lcte),

earliest_hk_over_all_sources AS (

{#- Deduplicate the unionized records again to only insert the earliest one. #}
SELECT
lcte.*
FROM earliest_hk_over_all_sources_prep AS lcte
WHERE rn = 1
{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}),
FROM {{ ns.last_cte }} AS lcte
QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts }}) = 1

{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}

),

records_to_insert AS (
{#- Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the hub. #}
Expand Down
16 changes: 6 additions & 10 deletions macros/tables/redshift/link.sql
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,17 @@ source_new_union AS (

{%- endif %}

earliest_hk_over_all_sources_prep AS (
SELECT
lcte.*,
ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts
}}) as rn
FROM {{ ns.last_cte }} AS lcte),

earliest_hk_over_all_sources AS (

{#- Deduplicate the unionized records again to only insert the earliest one. #}
SELECT
lcte.*
FROM earliest_hk_over_all_sources_prep AS lcte
WHERE rn = 1
{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}),
FROM {{ ns.last_cte }} AS lcte
QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1

{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}

),

records_to_insert AS (
{# Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the link. #}
Expand Down
31 changes: 8 additions & 23 deletions macros/tables/redshift/ma_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,44 +41,29 @@ source_data AS (

{# Get the latest record for each parent hashkey in existing sat, if incremental. #}
{%- if is_incremental() %}
latest_entries_in_sat_prep AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) as rn
FROM
{{ this }}
),

latest_entries_in_sat AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }}
FROM
latest_entries_in_sat_prep
WHERE rn = 1
{{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

{# Get a list of all distinct hashdiffs that exist for each parent_hashkey. #}
lag_source_data AS (
SELECT
{{ parent_hashkey }},
{{ src_ldts }},
{{ ns.hdiff_alias }},
LAG({{ ns.hdiff_alias }}) OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as prev_ns_hdiff_alias
FROM source_data
),

deduped_row_hashdiff AS (

SELECT
{{ parent_hashkey }},
{{ src_ldts }},
{{ ns.hdiff_alias }}
FROM lag_source_data
WHERE {{ ns.hdiff_alias }} != prev_ns_hdiff_alias OR prev_ns_hdiff_alias IS NULL
FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY CASE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
),

{# Dedupe the source data regarding non-delta groups. #}
Expand Down
18 changes: 7 additions & 11 deletions macros/tables/redshift/nh_link.sql
Original file line number Diff line number Diff line change
Expand Up @@ -226,21 +226,17 @@ source_new_union AS (

{%- if not source_is_single_batch %}

earliest_hk_over_all_sources_prep AS (
SELECT
lcte.*,
ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts
}}) as rn
FROM {{ ns.last_cte }} AS lcte),

earliest_hk_over_all_sources AS (

{#- Deduplicate the unionized records again to only insert the earliest one. #}
{#- Deduplicate the unionized records again to only insert the earliest one. #}
SELECT
lcte.*
FROM earliest_hk_over_all_sources_prep AS lcte
WHERE rn = 1
{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}),
FROM {{ ns.last_cte }} AS lcte
QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1

{%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}

),

{%- endif %}

Expand Down
44 changes: 10 additions & 34 deletions macros/tables/redshift/ref_sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,6 @@ source_data AS (

{# Get the latest record for each parent ref key combination in existing sat, if incremental. #}
{%- if is_incremental() %}
latest_entries_in_sat_prep AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
{% endfor %}
{{ ns.hdiff_alias }},
ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key|lower}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) as rn
FROM
{{ this }}
),

latest_entries_in_sat AS (

SELECT
Expand All @@ -66,16 +54,16 @@ latest_entries_in_sat AS (
{% endfor %}
{{ ns.hdiff_alias }}
FROM
latest_entries_in_sat_prep
WHERE rn = 1
{{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

{#
Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each parent ref key combination.
Additionally adding a row number based on that order, if incremental.
#}
deduplicated_numbered_source_prep AS (
deduplicated_numbered_source AS (

SELECT
{% for ref_key in parent_ref_keys %}
Expand All @@ -86,24 +74,12 @@ deduplicated_numbered_source_prep AS (
{% if is_incremental() -%}
, ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as rn
{%- endif %}
, LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key|lower}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as prev_hashdiff
FROM source_data
),

deduplicated_numbered_source AS (

SELECT
{% for ref_key in parent_ref_keys %}
{{ref_key}},
{% endfor %}
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
FROM deduplicated_numbered_source_prep
WHERE 1=1
AND {{ ns.hdiff_alias }} <> prev_hashdiff OR prev_hashdiff IS NULL
{% if is_incremental() -%}
AND rn = 1
{%- endif %}
FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY
CASE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
),

{#
Expand All @@ -128,7 +104,7 @@ records_to_insert AS (
AND {{ datavault4dbt.multikey(ref_key, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
{% endfor %}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
)
AND deduplicated_numbered_source.rn = 1)
{%- endif %}

)
Expand Down
42 changes: 11 additions & 31 deletions macros/tables/redshift/sat_v0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,22 @@ source_data AS (

{# Get the latest record for each parent hashkey in existing sat, if incremental. #}
{%- if is_incremental() %}
latest_entries_in_sat_prep AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) as rn
FROM
{{ this }}
),

latest_entries_in_sat AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }}
FROM
latest_entries_in_sat_prep
WHERE rn = 1
{{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1
),
{%- endif %}

{#
Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each hashkey.
Additionally adding a row number based on that order, if incremental.
#}
deduplicated_numbered_source_prep AS (
deduplicated_numbered_source AS (

SELECT
{{ parent_hashkey }},
Expand All @@ -76,23 +66,12 @@ deduplicated_numbered_source_prep AS (
{% if is_incremental() -%}
, ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as rn
{%- endif %}
, LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }}) as prev_hashdiff
FROM source_data

),

deduplicated_numbered_source AS (

SELECT
{{ parent_hashkey }},
{{ ns.hdiff_alias }},
{{ datavault4dbt.print_list(source_cols) }}
FROM deduplicated_numbered_source_prep
WHERE 1=1
AND {{ ns.hdiff_alias }} <> prev_hashdiff OR prev_hashdiff IS NULL
{% if is_incremental() -%}
AND rn = 1
{%- endif %}
FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from
QUALIFY
CASE
WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE
ELSE TRUE
END
),

{#
Expand All @@ -111,7 +90,8 @@ records_to_insert AS (
SELECT 1
FROM latest_entries_in_sat
WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }})
AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}
AND deduplicated_numbered_source.rn = 1)
{%- endif %}

)
Expand Down

0 comments on commit 27f64ae

Please sign in to comment.