diff --git a/macros/tables/redshift/eff_sat_v0.sql b/macros/tables/redshift/eff_sat_v0.sql index 3483db4a..8e734972 100644 --- a/macros/tables/redshift/eff_sat_v0.sql +++ b/macros/tables/redshift/eff_sat_v0.sql @@ -49,23 +49,13 @@ source_data AS ( In all incremental cases, the current status for each hashkey is selected from the existing Effectivity Satellite. #} {%- if is_incremental() %} -current_status_prep AS ( - - SELECT - {{ tracked_hashkey }}, - {{ is_active_alias}}, - ROW_NUMBER() OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) as rn - FROM {{ this }} - -), - current_status AS ( SELECT {{ tracked_hashkey }}, {{ is_active_alias }} - FROM current_status_prep - WHERE rn = 1 + FROM {{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from + QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1 ), {% endif %} @@ -136,32 +126,19 @@ current_status AS ( {# The rows are deduplicated on the is_active_alias, to only include status changes. - Additionally, a ROW_NUMBER() is calculated in incremental runs, to use it in the next step for comparison against the current status. #} - deduplicated_incoming_prep AS ( - - SELECT - is_active.{{ tracked_hashkey }}, - is_active.{{ src_ldts }}, - is_active.{{ is_active_alias }}, - LAG(is_active.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) as lag_is_active - - FROM is_active - - ), - deduplicated_incoming AS ( SELECT - deduplicated_incoming_prep.{{ tracked_hashkey }}, - deduplicated_incoming_prep.{{ src_ldts }}, - deduplicated_incoming_prep.{{ is_active_alias }} - - FROM - deduplicated_incoming_prep - WHERE - deduplicated_incoming_prep.{{ is_active_alias }} != deduplicated_incoming_prep.lag_is_active - OR deduplicated_incoming_prep.lag_is_active IS NULL + ia.{{ tracked_hashkey }}, + ia.{{ src_ldts }}, + ia.{{ is_active_alias }} + FROM is_active ia + QUALIFY + CASE + WHEN ia.{{ is_active_alias }} = LAG(ia.{{ is_active_alias }}) OVER (PARTITION BY {{ tracked_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END ), diff --git a/macros/tables/redshift/hub.sql b/macros/tables/redshift/hub.sql index dc45edf9..9418f59e 100644 --- a/macros/tables/redshift/hub.sql +++ b/macros/tables/redshift/hub.sql @@ -207,21 +207,17 @@ source_new_union AS ( {%- endif %} -earliest_hk_over_all_sources_prep AS ( - SELECT - lcte.*, - ROW_NUMBER() OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts - }}) as rn - FROM {{ ns.last_cte }} AS lcte), - earliest_hk_over_all_sources AS ( {#- Deduplicate the unionized records again to only insert the earliest one. #} SELECT lcte.* - FROM earliest_hk_over_all_sources_prep AS lcte - WHERE rn = 1 - {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}), + FROM {{ ns.last_cte }} AS lcte + QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts }}) = 1 + + {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%} + +), records_to_insert AS ( {#- Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the hub. #} diff --git a/macros/tables/redshift/link.sql b/macros/tables/redshift/link.sql index 05ccbe8a..9e67e1d9 100644 --- a/macros/tables/redshift/link.sql +++ b/macros/tables/redshift/link.sql @@ -210,21 +210,17 @@ source_new_union AS ( {%- endif %} -earliest_hk_over_all_sources_prep AS ( - SELECT - lcte.*, - ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts - }}) as rn - FROM {{ ns.last_cte }} AS lcte), - earliest_hk_over_all_sources AS ( {#- Deduplicate the unionized records again to only insert the earliest one. #} SELECT lcte.* - FROM earliest_hk_over_all_sources_prep AS lcte - WHERE rn = 1 - {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}), + FROM {{ ns.last_cte }} AS lcte + QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1 + + {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%} + +), records_to_insert AS ( {# Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the link. #} diff --git a/macros/tables/redshift/ma_sat_v0.sql b/macros/tables/redshift/ma_sat_v0.sql index 5e322ab2..4f59994c 100644 --- a/macros/tables/redshift/ma_sat_v0.sql +++ b/macros/tables/redshift/ma_sat_v0.sql @@ -41,44 +41,29 @@ source_data AS ( {# Get the latest record for each parent hashkey in existing sat, if incremental. #} {%- if is_incremental() %} -latest_entries_in_sat_prep AS ( - - SELECT - {{ parent_hashkey }}, - {{ ns.hdiff_alias }}, - ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) as rn - FROM - {{ this }} -), - latest_entries_in_sat AS ( SELECT {{ parent_hashkey }}, {{ ns.hdiff_alias }} FROM - latest_entries_in_sat_prep - WHERE rn = 1 + {{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from + QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1 ), {%- endif %} {# Get a list of all distinct hashdiffs that exist for each parent_hashkey. #} - lag_source_data AS ( - SELECT - {{ parent_hashkey }}, - {{ src_ldts }}, - {{ ns.hdiff_alias }}, - LAG({{ ns.hdiff_alias }}) OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as prev_ns_hdiff_alias - FROM source_data -), - deduped_row_hashdiff AS ( + SELECT {{ parent_hashkey }}, {{ src_ldts }}, {{ ns.hdiff_alias }} - FROM lag_source_data - WHERE {{ ns.hdiff_alias }} != prev_ns_hdiff_alias OR prev_ns_hdiff_alias IS NULL + FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from + QUALIFY CASE + WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END ), {# Dedupe the source data regarding non-delta groups. #} diff --git a/macros/tables/redshift/nh_link.sql b/macros/tables/redshift/nh_link.sql index 38ec0368..b4e895c8 100644 --- a/macros/tables/redshift/nh_link.sql +++ b/macros/tables/redshift/nh_link.sql @@ -226,21 +226,17 @@ source_new_union AS ( {%- if not source_is_single_batch %} -earliest_hk_over_all_sources_prep AS ( - SELECT - lcte.*, - ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts - }}) as rn - FROM {{ ns.last_cte }} AS lcte), - earliest_hk_over_all_sources AS ( - {#- Deduplicate the unionized records again to only insert the earliest one. #} +{#- Deduplicate the unionized records again to only insert the earliest one. #} SELECT lcte.* - FROM earliest_hk_over_all_sources_prep AS lcte - WHERE rn = 1 - {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%}), + FROM {{ ns.last_cte }} AS lcte + QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1 + + {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%} + +), {%- endif %} diff --git a/macros/tables/redshift/ref_sat_v0.sql b/macros/tables/redshift/ref_sat_v0.sql index 6ce47239..4fceff2d 100644 --- a/macros/tables/redshift/ref_sat_v0.sql +++ b/macros/tables/redshift/ref_sat_v0.sql @@ -46,18 +46,6 @@ source_data AS ( {# Get the latest record for each parent ref key combination in existing sat, if incremental. #} {%- if is_incremental() %} -latest_entries_in_sat_prep AS ( - - SELECT - {% for ref_key in parent_ref_keys %} - {{ref_key}}, - {% endfor %} - {{ ns.hdiff_alias }}, - ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key|lower}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) as rn - FROM - {{ this }} -), - latest_entries_in_sat AS ( SELECT @@ -66,8 +54,8 @@ latest_entries_in_sat AS ( {% endfor %} {{ ns.hdiff_alias }} FROM - latest_entries_in_sat_prep - WHERE rn = 1 + {{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from + QUALIFY ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) = 1 ), {%- endif %} @@ -75,7 +63,7 @@ latest_entries_in_sat AS ( Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each parent ref key combination. Additionally adding a row number based on that order, if incremental. #} -deduplicated_numbered_source_prep AS ( +deduplicated_numbered_source AS ( SELECT {% for ref_key in parent_ref_keys %} @@ -86,24 +74,12 @@ deduplicated_numbered_source_prep AS ( {% if is_incremental() -%} , ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as rn {%- endif %} - , LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key|lower}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as prev_hashdiff - FROM source_data -), - -deduplicated_numbered_source AS ( - - SELECT - {% for ref_key in parent_ref_keys %} - {{ref_key}}, - {% endfor %} - {{ ns.hdiff_alias }}, - {{ datavault4dbt.print_list(source_cols) }} - FROM deduplicated_numbered_source_prep - WHERE 1=1 - AND {{ ns.hdiff_alias }} <> prev_hashdiff OR prev_hashdiff IS NULL - {% if is_incremental() -%} - AND rn = 1 - {%- endif %} + FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from + QUALIFY + CASE + WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END ), {# @@ -128,7 +104,7 @@ records_to_insert AS ( AND {{ datavault4dbt.multikey(ref_key, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} {% endfor %} AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} - ) + AND deduplicated_numbered_source.rn = 1) {%- endif %} ) diff --git a/macros/tables/redshift/sat_v0.sql b/macros/tables/redshift/sat_v0.sql index e04d4f66..8833c635 100644 --- a/macros/tables/redshift/sat_v0.sql +++ b/macros/tables/redshift/sat_v0.sql @@ -42,24 +42,14 @@ source_data AS ( {# Get the latest record for each parent hashkey in existing sat, if incremental. #} {%- if is_incremental() %} -latest_entries_in_sat_prep AS ( - - SELECT - {{ parent_hashkey }}, - {{ ns.hdiff_alias }}, - ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) as rn - FROM - {{ this }} -), - latest_entries_in_sat AS ( SELECT {{ parent_hashkey }}, {{ ns.hdiff_alias }} FROM - latest_entries_in_sat_prep - WHERE rn = 1 + {{ this }} redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from + QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }} DESC) = 1 ), {%- endif %} @@ -67,7 +57,7 @@ latest_entries_in_sat AS ( Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each hashkey. Additionally adding a row number based on that order, if incremental. #} -deduplicated_numbered_source_prep AS ( +deduplicated_numbered_source AS ( SELECT {{ parent_hashkey }}, @@ -76,23 +66,12 @@ deduplicated_numbered_source_prep AS ( {% if is_incremental() -%} , ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as rn {%- endif %} - , LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }}) as prev_hashdiff - FROM source_data - -), - -deduplicated_numbered_source AS ( - - SELECT - {{ parent_hashkey }}, - {{ ns.hdiff_alias }}, - {{ datavault4dbt.print_list(source_cols) }} - FROM deduplicated_numbered_source_prep - WHERE 1=1 - AND {{ ns.hdiff_alias }} <> prev_hashdiff OR prev_hashdiff IS NULL - {% if is_incremental() -%} - AND rn = 1 - {%- endif %} + FROM source_data redshift_requires_an_alias_if_the_qualify_is_directly_after_the_from + QUALIFY + CASE + WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END ), {# @@ -111,7 +90,8 @@ records_to_insert AS ( SELECT 1 FROM latest_entries_in_sat WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} - AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }}) + AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} + AND deduplicated_numbered_source.rn = 1) {%- endif %} )