diff --git a/macros/hooks/clean_up_pits.sql b/macros/hooks/clean_up_pits.sql index decc7a04..02654adc 100644 --- a/macros/hooks/clean_up_pits.sql +++ b/macros/hooks/clean_up_pits.sql @@ -123,4 +123,15 @@ WHERE snap.{{ sdts }} IS NULL {{ log("PIT " ~ this ~ " successfully cleaned!", True) }} {%- endif -%} +{%- endmacro -%} + +{%- macro databricks__clean_up_pit(snapshot_relation, snapshot_trigger_column, sdts) -%} + +DELETE FROM {{ this }} pit +WHERE pit.{{ sdts }} not in (SELECT {{ sdts }} FROM {{ ref(snapshot_relation) }} snap WHERE {{ snapshot_trigger_column }}=TRUE) + +{%- if execute -%} +{{ log("PIT " ~ this ~ " successfully cleaned!", True) }} +{%- endif -%} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/internal/metadata_processing/escape_column_names.sql b/macros/internal/metadata_processing/escape_column_names.sql index 642b78a2..35501b2c 100644 --- a/macros/internal/metadata_processing/escape_column_names.sql +++ b/macros/internal/metadata_processing/escape_column_names.sql @@ -184,4 +184,16 @@ {%- do return(escaped_column_name) -%} +{%- endmacro -%} + + +{%- macro databricks__escape_column_name(column) -%} + + {%- set escape_char_left = var('escape_char_left', "") -%} + {%- set escape_char_right = var('escape_char_right', "") -%} + + {%- set escaped_column_name = escape_char_left ~ column | upper | replace(escape_char_left, '') | replace(escape_char_right, '') | trim ~ escape_char_right | indent(4) -%} + + {%- do return(escaped_column_name) -%} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/staging/databricks/stage.sql b/macros/staging/databricks/stage.sql new file mode 100644 index 00000000..fc76044b --- /dev/null +++ b/macros/staging/databricks/stage.sql @@ -0,0 +1,608 @@ +{# This is the databricks version of the stage macro, designed for Azure Databricks. #} + +{%- macro databricks__stage(include_source_columns, + ldts, + rsrc, + source_model, + hashed_columns, + derived_columns, + sequence, + prejoined_columns, + missing_columns, + multi_active_config, + enable_ghost_records) -%} + +{% if (source_model is none) and execute %} + + {%- set error_message -%} + Staging error: Missing source_model configuration. A source model name must be provided. + e.g. + [REF STYLE] + source_model: model_name + OR + [SOURCES STYLE] + source_model: + source_name: source_table_name + {%- endset -%} + + {{- exceptions.raise_compiler_error(error_message) -}} +{%- endif -%} + +{{ log('source_model: ' ~ source_model, false )}} + +{#- Check for source format or ref format and create relation object from source_model -#} +{% if source_model is mapping and source_model is not none -%} + + {%- set source_name = source_model | first -%} + {%- set source_table_name = source_model[source_name] -%} + + {%- set source_relation = source(source_name, source_table_name) -%} + {%- set all_source_columns = datavault4dbt.source_columns(source_relation=source_relation) -%} + +{%- elif source_model is not mapping and source_model is not none -%} + + {{ log('source_model is not mapping and not none: ' ~ source_model, false) }} + + {%- set source_relation = ref(source_model) -%} + {%- set all_source_columns = datavault4dbt.source_columns(source_relation=source_relation) -%} +{%- else -%} + {%- set all_source_columns = [] -%} +{%- endif -%} + +{{ log('source_relation: ' ~ source_relation, false) }} + +{# Setting the column name for load date timestamp and record source to the alias coming from the attributes #} +{%- set ldts_alias = var('datavault4dbt.ldts_alias', 'ldts') -%} +{%- set rsrc_alias = var('datavault4dbt.rsrc_alias', 'rsrc') -%} +{%- set copy_input_columns = var('datavault4dbt.copy_rsrc_ldts_input_columns', false) -%} +{%- set load_datetime_col_name = ldts_alias -%} +{%- set record_source_col_name = rsrc_alias -%} + +{%- set ldts_rsrc_input_column_names = [] -%} +{%- if datavault4dbt.is_attribute(ldts) -%} + {%- if not copy_input_columns -%} + {%- set ldts_rsrc_input_column_names = ldts_rsrc_input_column_names + [ldts] -%} + {%- else -%} + + {%- if ldts|lower == ldts_alias|lower -%} + {%- set ldts_rsrc_input_column_names = ldts_rsrc_input_column_names + [ldts] -%} + {%- endif -%} + + {%- endif %} + +{%- endif -%} + +{%- if datavault4dbt.is_attribute(rsrc) -%} + + {%- if not copy_input_columns -%} + {%- set ldts_rsrc_input_column_names = ldts_rsrc_input_column_names + [rsrc] -%} + {%- else -%} + + {%- if rsrc|lower == rsrc_alias|lower -%} + {%- set ldts_rsrc_input_column_names = ldts_rsrc_input_column_names + [rsrc] -%} + {%- endif -%} + + {%- endif -%} + +{%- endif %} + +{%- if datavault4dbt.is_something(sequence) -%} + {%- set ldts_rsrc_input_column_names = ldts_rsrc_input_column_names + [sequence] -%} +{%- endif -%} + +{%- set ldts = datavault4dbt.as_constant(ldts) -%} +{%- set rsrc = datavault4dbt.as_constant(rsrc) -%} + +{# Getting the column names for all additional columns #} +{%- set derived_column_names = datavault4dbt.extract_column_names(derived_columns) -%} +{%- set hashed_column_names = datavault4dbt.extract_column_names(hashed_columns) -%} +{%- set prejoined_column_names = datavault4dbt.extract_column_names(prejoined_columns) -%} +{%- set missing_column_names = datavault4dbt.extract_column_names(missing_columns) -%} +{%- set exclude_column_names = hashed_column_names + prejoined_column_names + missing_column_names + ldts_rsrc_input_column_names %} +{%- set source_and_derived_column_names = (all_source_columns + derived_column_names) | unique | list -%} +{%- set all_columns = adapter.get_columns_in_relation( source_relation ) -%} +{%- set columns_without_excluded_columns = [] -%} +{%- set final_columns_to_select = [] -%} + + +{%- if include_source_columns -%} + {%- set source_columns_to_select = datavault4dbt.process_columns_to_select(all_source_columns, exclude_column_names) | list -%} + + {%- for column in all_columns -%} + + {%- if column.name|lower not in exclude_column_names|map('lower') %} + {%- do columns_without_excluded_columns.append(column) -%} + {%- endif -%} + + {%- endfor -%} +{%- else -%} + + {# Include from the source only the input columns needed #} + {# Getting the input columns for the additional columns #} + {%- set derived_input_columns = datavault4dbt.extract_input_columns(derived_columns) -%} + {%- set hashed_input_columns = datavault4dbt.expand_column_list(datavault4dbt.extract_input_columns(hashed_columns)) -%} + {%- set hashed_input_columns = datavault4dbt.process_columns_to_select(hashed_input_columns, derived_column_names) -%} {# Excluding the names of the derived columns. #} + {%- set hashed_input_columns = datavault4dbt.process_columns_to_select(hashed_input_columns, prejoined_column_names) -%} {# Excluding the names of the prejoined columns. #} + {%- set hashed_input_columns = datavault4dbt.process_columns_to_select(hashed_input_columns, missing_column_names) -%} {# Excluding the names of the missing columns. #} + {%- set prejoined_input_columns = datavault4dbt.extract_input_columns(prejoined_columns) -%} + + {% if datavault4dbt.is_something(multi_active_config) %} + + {%- if datavault4dbt.is_list(multi_active_config['multi_active_key']) -%} + + {%- set ma_keys = multi_active_config['multi_active_key'] -%} + + {%- else -%} + + {%- set ma_keys = [multi_active_config['multi_active_key']] -%} + + {%- endif -%} + + {%- set only_include_from_source = (derived_input_columns + hashed_input_columns + prejoined_input_columns + ma_keys) | unique | list -%} + + {%- else -%} + + {%- set only_include_from_source = (derived_input_columns + hashed_input_columns + prejoined_input_columns) | unique | list -%} + + {%- endif -%} + + {%- set source_columns_to_select = only_include_from_source -%} + +{%- endif-%} + +{%- set final_columns_to_select = final_columns_to_select + source_columns_to_select -%} +{%- set derived_columns_to_select = datavault4dbt.process_columns_to_select(source_and_derived_column_names, hashed_column_names) | unique | list -%} + +{%- if datavault4dbt.is_something(derived_columns) %} + {#- Getting Data types for derived columns with detection from source relation -#} + {%- set derived_columns_with_datatypes = datavault4dbt.derived_columns_datatypes(derived_columns, source_relation) -%} + {%- set derived_columns_with_datatypes_DICT = fromjson(derived_columns_with_datatypes) -%} +{%- endif -%} +{#- Select hashing algorithm -#} + +{#- Setting unknown and error keys with default values for the selected hash algorithm -#} +{%- set hash = datavault4dbt.hash_method() -%} +{%- set hash_dtype = var('datavault4dbt.hash_datatype', 'STRING') -%} +{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash,hash_datatype=hash_dtype)) -%} +{%- set hash_alg = hash_default_values['hash_alg'] -%} +{%- set unknown_key = hash_default_values['unknown_key'] -%} +{%- set error_key = hash_default_values['error_key'] -%} + +{# Select timestamp and format variables #} +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{# Setting the error/unknown value for the record source for the ghost records#} +{% set error_value_rsrc = var('datavault4dbt.default_error_rsrc', 'ERROR') %} +{% set unknown_value_rsrc = var('datavault4dbt.default_unknown_rsrc', 'SYSTEM') %} + +{# Setting the rsrc default datatype #} +{% set rsrc_default_dtype = datavault4dbt.string_default_dtype(type=rsrc) %} + +WITH + +{# Selecting everything that we need from the source relation. #} +source_data AS ( + SELECT + + {{- "\n\n " ~ datavault4dbt.print_list(datavault4dbt.escape_column_names(all_source_columns)) if all_source_columns else " *" }} + + FROM {{ source_relation }} + + {% if is_incremental() %} + WHERE {{ ldts }} > (SELECT max({{ load_datetime_col_name}}) + FROM {{ this }} + WHERE {{ load_datetime_col_name}} != {{ datavault4dbt.string_to_timestamp(timestamp_format , end_of_all_times) }} ) + {%- endif -%} + + {% set last_cte = "source_data" -%} +), + + +{% set alias_columns = [load_datetime_col_name, record_source_col_name] %} + +{# Selecting all columns from the source data, renaming load date and record source to global aliases #} +ldts_rsrc_data AS ( + + SELECT + {{ ldts }} AS {{ load_datetime_col_name}}, + CAST( {{ rsrc }} as {{ rsrc_default_dtype }} ) AS {{ record_source_col_name }} + {%- if datavault4dbt.is_something(sequence) %}, + {{ sequence }} AS edwSequence + {%- set alias_columns = alias_columns + ['edwSequence'] -%} + {% endif -%} + + {%- if source_columns_to_select is not none and source_columns_to_select | length > 0 %}, + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(source_columns_to_select)) }} + {% endif -%} + {{"\n"}} + FROM {{ last_cte }} + + {%- set last_cte = "ldts_rsrc_data" -%} + {%- set final_columns_to_select = alias_columns + final_columns_to_select %} + {%- set final_columns_to_select = datavault4dbt.process_columns_to_select(final_columns_to_select, derived_column_names) | list -%} + + {%- set columns_without_excluded_columns_tmp = [] -%} + {%- for column in columns_without_excluded_columns -%} + {%- if column.name | lower not in derived_column_names | map('lower') -%} + {%- do columns_without_excluded_columns_tmp.append(column) -%} + {%- endif -%} + {%- endfor -%} + {%- set columns_without_excluded_columns = columns_without_excluded_columns_tmp |list -%} +), + +{%- if datavault4dbt.is_something(missing_columns) %} + +{# Filling missing columns with NULL values for schema changes #} +missing_columns AS ( + + SELECT + + {% if final_columns_to_select | length > 0 -%} + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(final_columns_to_select)) }}, + {%- endif %} + {%- for col, dtype in missing_columns.items() %} + CAST(NULL as {{ dtype }}) as {{ col }}{% if not loop.last %},{% endif -%} + + {% endfor %} + + FROM {{ last_cte }} + {%- set last_cte = "missing_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + missing_column_names %} +), +{%- endif -%} + +{%- if datavault4dbt.is_something(prejoined_columns) %} +{# Prejoining Business Keys of other source objects for Link purposes #} +prejoined_columns AS ( + + SELECT + {% if final_columns_to_select | length > 0 -%} + {{ datavault4dbt.print_list(datavault4dbt.prefix(columns=datavault4dbt.escape_column_names(final_columns_to_select), prefix_str='lcte').split(',')) }} + {% endif %} + {%- for col, vals in prejoined_columns.items() -%} + ,pj_{{loop.index}}.{{ vals['bk'] }} AS {{ col }} + {% endfor -%} + + FROM {{ last_cte }} lcte + + {% for col, vals in prejoined_columns.items() %} + + {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} + {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} + {%- elif 'ref_model' in vals.keys() -%} + {%- set relation = ref(vals['ref_model']) -%} + {%- else -%} + {%- set error_message -%} + Prejoin error: Invalid target entity definition. Allowed are: + e.g. + [REF STYLE] + extracted_column_alias: + ref_model: model_name + bk: extracted_column_name + this_column_name: join_columns_in_this_model + ref_column_name: join_columns_in_ref_model + OR + [SOURCES STYLE] + extracted_column_alias: + src_name: name_of_ref_source + src_table: name_of_ref_table + bk: extracted_column_name + this_column_name: join_columns_in_this_model + ref_column_name: join_columns_in_ref_model + + Got: + {{ col }}: {{ vals }} + {%- endset -%} + + {%- do exceptions.raise_compiler_error(error_message) -%} + {%- endif -%} + +{# This sets a default value for the operator that connects multiple joining conditions. Only when it is not set by user. #} + {%- if 'operator' not in vals.keys() -%} + {%- set operator = 'AND' -%} + {%- else -%} + {%- set operator = vals['operator'] -%} + {%- endif -%} + + {%- set prejoin_alias = 'pj_' + loop.index|string -%} + + left join {{ relation }} as {{ prejoin_alias }} + on {{ datavault4dbt.multikey(columns=vals['this_column_name'], prefix=['lcte', prejoin_alias], condition='=', operator=operator, right_columns=vals['ref_column_name']) }} + + {% endfor %} + + {% set last_cte = "prejoined_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + prejoined_column_names %} +), +{%- endif -%} + + +{%- if datavault4dbt.is_something(derived_columns) %} +{# Adding derived columns to the selection #} +derived_columns AS ( + + {%- set final_columns_to_select = datavault4dbt.process_columns_to_select(final_columns_to_select, derived_column_names) -%} + + SELECT + + {% if final_columns_to_select | length > 0 -%} + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(final_columns_to_select)) }}, + {% endif %} + {{ datavault4dbt.derive_columns(columns=derived_columns) | indent(4) }} + + FROM {{ last_cte }} + {%- set last_cte = "derived_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + derived_column_names %} +), +{%- endif -%} + +{%- if datavault4dbt.is_something(hashed_columns) and hashed_columns is mapping %} +{# Generating Hashed Columns (hashkeys and hashdiffs for Hubs/Links/Satellites) #} +{% if datavault4dbt.is_something(multi_active_config) %} + +{%- set tmp_ns = namespace(main_hashkey_dict={}, remaining_hashed_columns={}, hashdiff_names=[], hashdiff_dict={}) -%} + +{%- for column in hashed_columns.keys() -%} + {%- if column == multi_active_config['main_hashkey_column'] and not hashed_columns[column].is_hashdiff -%} + {%- do tmp_ns.main_hashkey_dict.update({column: hashed_columns[column]}) -%} + {% elif column != multi_active_config['main_hashkey_column'] and not hashed_columns[column].is_hashdiff -%} + {%- do tmp_ns.remaining_hashed_columns.update({column: hashed_columns[column]}) -%} + {%- elif hashed_columns[column].is_hashdiff -%} + {%- do tmp_ns.hashdiff_names.append(column) -%} + {%- do tmp_ns.hashdiff_dict.update({column: hashed_columns[column]}) -%} + {%- endif -%} +{%- endfor -%} + +main_hashkey_generation AS ( + + SELECT + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(final_columns_to_select)) }}, + {% set processed_hash_columns = datavault4dbt.process_hash_column_excludes(tmp_ns.main_hashkey_dict) -%} + {{- datavault4dbt.hash_columns(columns=processed_hash_columns) | indent(4) }} + FROM {{ last_cte }} + +), + +{# Hash calculation for multi-active source data. #} +ma_hashdiff_prep AS ( + + SELECT + + {% set processed_hash_columns = datavault4dbt.process_hash_column_excludes(tmp_ns.hashdiff_dict) -%} + + {{ multi_active_config['main_hashkey_column'] }}, + {# Generates only all hashdiffs. #} + {{- datavault4dbt.hash_columns(columns=processed_hash_columns, multi_active_key=multi_active_config['multi_active_key'], main_hashkey_column=multi_active_config['main_hashkey_column']) | indent(4) }}, + {{ ldts_alias }} + + FROM main_hashkey_generation + GROUP BY {{ multi_active_config['main_hashkey_column'] }}, {{ ldts_alias }} + + {% do processed_hash_columns.update(datavault4dbt.process_hash_column_excludes(tmp_ns.main_hashkey_dict)) -%} {# Add main hashkey to list of processed columns, otherwise ghost records dont get created #} + +), + +hashed_columns AS ( + + SELECT + + {{ datavault4dbt.alias_all(columns=final_columns_to_select, prefix='main_hashkey_generation') }}, {# Everything from last_cte before hashed_columns. #} + {% set processed_remaining_hash_columns = datavault4dbt.process_hash_column_excludes(tmp_ns.remaining_hashed_columns) -%} + {# Generates only all remaining hashkeys, that are no hashdiffs #} + + {%- if datavault4dbt.is_something(processed_remaining_hash_columns) %} + {{- datavault4dbt.hash_columns(columns=processed_remaining_hash_columns) | indent(4) }}, {# All remaining hashed_columns get calculated. #} + {%- do processed_hash_columns.update(datavault4dbt.process_hash_column_excludes(tmp_ns.remaining_hashed_columns)) -%} {# All remaining hashed_columns get calculated. #} + {% endif -%} + + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(tmp_ns.hashdiff_names)) }}, {# All MA Hashdiffs are selected. #} + main_hashkey_generation.{{ multi_active_config['main_hashkey_column'] }} {# Main Hashkey selected. #} + + FROM main_hashkey_generation + LEFT JOIN ma_hashdiff_prep + ON main_hashkey_generation.{{ multi_active_config['main_hashkey_column'] }} = ma_hashdiff_prep.{{ multi_active_config['main_hashkey_column'] }} + AND main_hashkey_generation.{{ ldts_alias }} = ma_hashdiff_prep.{{ ldts_alias }} + + {%- set last_cte = "hashed_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + hashed_column_names %} + +), + +{% else %} + +{# Hash calculation for single-active source data. #} +hashed_columns AS ( + + SELECT + + {% if final_columns_to_select | length > 0 -%} + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(final_columns_to_select)) }}, + {% endif %} + + {%- set processed_hash_columns = datavault4dbt.process_hash_column_excludes(hashed_columns) -%} + {{ datavault4dbt.hash_columns(columns=processed_hash_columns) | indent(4) }} + + FROM {{ last_cte }} + {%- set last_cte = "hashed_columns" -%} + {%- set final_columns_to_select = final_columns_to_select + hashed_column_names %} + +), + +{%- endif -%} +{%- endif -%} + +{% if enable_ghost_records and not is_incremental() %} +{# Creating Ghost Record for unknown case, based on datatype #} +unknown_values AS ( + + SELECT + + {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} as {{ load_datetime_col_name }}, + '{{ unknown_value_rsrc }}' as {{ record_source_col_name }} + + {%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%}, + {# Generating Ghost Records for all source columns, except the ldts, rsrc & edwSequence column #} + {%- for column in columns_without_excluded_columns %} + {{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown') }} + {%- if not loop.last %},{% endif -%} + {%- endfor -%} + + {%- endif -%} + + {%- if datavault4dbt.is_something(missing_columns) -%}, + {# Additionally generating ghost record for missing columns #} + {%- for col, dtype in missing_columns.items() %} + {{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='unknown') }} + {%- if not loop.last %},{% endif -%} + {%- endfor -%} + {%- endif -%} + + {%- if datavault4dbt.is_something(prejoined_columns) -%}, + {# Additionally generating ghost records for the prejoined attributes#} + {% for col, vals in prejoined_columns.items() %} + + {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} + {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} + {%- elif 'ref_model' in vals.keys() -%} + {%- set relation = ref(vals['ref_model']) -%} + {%- endif -%} + + {%- set pj_relation_columns = adapter.get_columns_in_relation( relation ) -%} + {{ log('pj_relation_columns: ' ~ pj_relation_columns, false ) }} + + {% for column in pj_relation_columns -%} + + {% if column.name|lower == vals['bk']|lower -%} + {{ log('column found? yes, for column :' ~ column.name , false) }} + {{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='unknown', alias=col) }} + {%- endif -%} + + {%- endfor -%} + {%- if not loop.last %},{% endif %} + {% endfor -%} + {%- endif %} + + {%- if datavault4dbt.is_something(derived_columns) -%}, + {# Additionally generating Ghost Records for Derived Columns #} + {%- for column_name, properties in derived_columns_with_datatypes_DICT.items() %} + {{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='unknown') }} + {%- if not loop.last %},{% endif -%} + {%- endfor -%} + + {%- endif -%} + + {%- if datavault4dbt.is_something(processed_hash_columns) -%}, + + {%- for hash_column in processed_hash_columns %} + CAST({{ datavault4dbt.as_constant(column_str=unknown_key) }} as {{ hash_dtype }}) as {{ hash_column }} + {%- if not loop.last %},{% endif %} + {%- endfor -%} + + {%- endif -%} + {{-"\n"-}} +), + +{# Creating Ghost Record for error case, based on datatype #} +error_values AS ( + + SELECT + + {{ datavault4dbt.string_to_timestamp(timestamp_format , end_of_all_times) }} as {{ load_datetime_col_name }}, + '{{ error_value_rsrc }}' as {{ record_source_col_name }} + + {%- if columns_without_excluded_columns is defined and columns_without_excluded_columns| length > 0 -%}, + {# Generating Ghost Records for Source Columns #} + {%- for column in columns_without_excluded_columns %} + {{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error') }} + {%- if not loop.last %},{% endif -%} + {%- endfor -%} + + {%- endif -%} + + {%- if datavault4dbt.is_something(missing_columns) -%}, + {# Additionally generating ghost record for Missing columns #} + {%- for col, dtype in missing_columns.items() %} + {{ datavault4dbt.ghost_record_per_datatype(column_name=col, datatype=dtype, ghost_record_type='error') }} + {%- if not loop.last %},{% endif -%} + {%- endfor -%} + {%- endif -%} + + {%- if datavault4dbt.is_something(prejoined_columns) -%}, + {# Additionally generating ghost records for the prejoined attributes #} + {%- for col, vals in prejoined_columns.items() %} + + {%- if 'src_name' in vals.keys() or 'src_table' in vals.keys() -%} + {%- set relation = source(vals['src_name']|string, vals['src_table']) -%} + {%- elif 'ref_model' in vals.keys() -%} + {%- set relation = ref(vals['ref_model']) -%} + {%- endif -%} + + {%- set pj_relation_columns = adapter.get_columns_in_relation( relation ) -%} + + {% for column in pj_relation_columns -%} + {% if column.name|lower == vals['bk']|lower -%} + {{ datavault4dbt.ghost_record_per_datatype(column_name=column.name, datatype=column.dtype, ghost_record_type='error', alias=col) -}} + {%- endif -%} + {%- endfor -%} + {%- if not loop.last -%},{%- endif %} + {% endfor -%} + + {%- endif -%} + + {%- if datavault4dbt.is_something(derived_columns) %}, + {# Additionally generating Ghost Records for Derived Columns #} + {%- for column_name, properties in derived_columns_with_datatypes_DICT.items() %} + {{ datavault4dbt.ghost_record_per_datatype(column_name=column_name, datatype=properties.datatype, ghost_record_type='error') }} + {%- if not loop.last %},{% endif %} + {%- endfor -%} + + {%- endif -%} + + {%- if datavault4dbt.is_something(processed_hash_columns) -%}, + + {%- for hash_column in processed_hash_columns %} + CAST({{ datavault4dbt.as_constant(column_str=error_key) }} as {{ hash_dtype }}) as {{ hash_column }} + {%- if not loop.last %},{% endif %} + {%- endfor -%} + + {%- endif -%} + {{- "\n" -}} +), + +{# Combining all previous ghost record calculations to two rows with the same width as regular entries #} +ghost_records AS ( + SELECT * FROM unknown_values + UNION ALL + SELECT * FROM error_values +), +{%- endif -%} + +{%- if not include_source_columns -%} + {% set final_columns_to_select = datavault4dbt.process_columns_to_select(columns_list=final_columns_to_select, exclude_columns_list=source_columns_to_select) %} +{%- endif -%} + +{# Combining the two ghost records with the regular data #} +columns_to_select AS ( + + SELECT + + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(final_columns_to_select)) }} + + FROM {{ last_cte }} + + {%- if enable_ghost_records and not is_incremental() %} + UNION ALL + + SELECT + + {{ datavault4dbt.print_list(datavault4dbt.escape_column_names(final_columns_to_select)) }} + + FROM ghost_records + {%- endif %} +) + +SELECT * FROM columns_to_select + +{%- endmacro -%} diff --git a/macros/supporting/beginning_of_all_times.sql b/macros/supporting/beginning_of_all_times.sql index f56d1955..b16d458a 100644 --- a/macros/supporting/beginning_of_all_times.sql +++ b/macros/supporting/beginning_of_all_times.sql @@ -177,4 +177,29 @@ {{ return(beginning_of_all_times) }} -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} + + +{%- macro databricks__beginning_of_all_times() %} + +{%- set global_var = var('datavault4dbt.beginning_of_all_times', none) -%} +{%- set beginning_of_all_times = '' -%} + +{%- if global_var is mapping -%} + {%- if 'databricks' in global_var.keys()|map('lower') -%} + {% set beginning_of_all_times = global_var['databricks'] %} + {%- else -%} + {%- if execute -%} + {%- do exceptions.warn("Warning: You have set the global variable 'datavault4dbt.beginning_of_all_times' to a dictionary, but have not included the adapter you use (databricks) as a key. Applying the default value.") -%} + {% endif %} + {%- set beginning_of_all_times = "0001-01-01 00:00:01" -%} + {% endif %} +{%- elif global_var is not mapping and datavault4dbt.is_something(global_var) -%} + {%- set beginning_of_all_times = global_var -%} +{%- else -%} + {%- set beginning_of_all_times = "0001-01-01 00:00:01" -%} +{%- endif -%} + +{{ return(beginning_of_all_times) }} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/beginning_of_all_times_date.sql b/macros/supporting/beginning_of_all_times_date.sql index 8f0bb4b6..f4f559a9 100644 --- a/macros/supporting/beginning_of_all_times_date.sql +++ b/macros/supporting/beginning_of_all_times_date.sql @@ -177,4 +177,29 @@ {{ return(beginning_of_all_times_date) }} +{%- endmacro -%} + + +{%- macro databricks__beginning_of_all_times_date() %} + +{%- set global_var = var('datavault4dbt.beginning_of_all_times_date', none) -%} +{%- set beginning_of_all_times_date = '' -%} + +{%- if global_var is mapping -%} + {%- if 'databricks' in global_var.keys()|map('lower') -%} + {% set beginning_of_all_times_date = global_var['databricks'] %} + {%- else -%} + {%- if execute -%} + {%- do exceptions.warn("Warning: You have set the global variable 'datavault4dbt.beginning_of_all_times_date' to a dictionary, but have not included the adapter you use (databricks) as a key. Applying the default value.") -%} + {% endif %} + {%- set beginning_of_all_times_date = "0001-01-01" -%} + {% endif %} +{%- elif global_var is not mapping and datavault4dbt.is_something(global_var) -%} + {%- set beginning_of_all_times_date = global_var -%} +{%- else -%} + {%- set beginning_of_all_times_date = "0001-01-01" -%} +{%- endif -%} + +{{ return(beginning_of_all_times_date) }} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/current_timestamp.sql b/macros/supporting/current_timestamp.sql index c342d1a1..db6f8027 100644 --- a/macros/supporting/current_timestamp.sql +++ b/macros/supporting/current_timestamp.sql @@ -28,4 +28,8 @@ {% macro fabric__current_timestamp_in_utc() %} {{return('sysutcdatetime()')}} +{% endmacro %} + +{% macro databricks__current_timestamp() %} + {{ return('current_timestamp()') }} {% endmacro %} \ No newline at end of file diff --git a/macros/supporting/date_format.sql b/macros/supporting/date_format.sql index 005a0cd5..72f7253a 100644 --- a/macros/supporting/date_format.sql +++ b/macros/supporting/date_format.sql @@ -176,4 +176,29 @@ {{ return(date_format) }} +{%- endmacro -%} + + +{%- macro databricks__date_format() %} + +{%- set global_var = var('datavault4dbt.date_format', none) -%} +{%- set date_format = '' -%} + +{%- if global_var is mapping -%} + {%- if 'databricks' in global_var.keys()|map('lower') -%} + {% set date_format = global_var['databricks'] %} + {%- else -%} + {%- if execute -%} + {%- do exceptions.warn("Warning: You have set the global variable 'datavault4dbt.date_format' to a dictionary, but have not included the adapter you use (databricks) as a key. Applying the default value.") -%} + {% endif %} + {%- set date_format = "yyyy-mm-dd" -%} + {% endif %} +{%- elif global_var is not mapping and datavault4dbt.is_something(global_var) -%} + {%- set date_format = global_var -%} +{%- else -%} + {%- set date_format = "yyyy-mm-dd" -%} +{%- endif -%} + +{{ return(date_format) }} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/end_of_all_times.sql b/macros/supporting/end_of_all_times.sql index bed9f61a..52dbad8e 100644 --- a/macros/supporting/end_of_all_times.sql +++ b/macros/supporting/end_of_all_times.sql @@ -174,3 +174,28 @@ {{ return(end_of_all_times) }} {%- endmacro -%} + + +{%- macro databricks__end_of_all_times() %} + +{%- set global_var = var('datavault4dbt.end_of_all_times', none) -%} +{%- set end_of_all_times = '' -%} + +{%- if global_var is mapping -%} + {%- if 'databricks' in global_var.keys()|map('lower') -%} + {% set end_of_all_times = global_var['databricks'] %} + {%- else -%} + {%- if execute -%} + {%- do exceptions.warn("Warning: You have set the global variable 'datavault4dbt.end_of_all_times' to a dictionary, but have not included the adapter you use (databricks) as a key. Applying the default value.") -%} + {% endif %} + {%- set end_of_all_times = "8888-12-31 23:59:59" -%} + {% endif %} +{%- elif global_var is not mapping and datavault4dbt.is_something(global_var) -%} + {%- set end_of_all_times = global_var -%} +{%- else -%} + {%- set end_of_all_times = "8888-12-31 23:59:59" -%} +{%- endif -%} + +{{ return(end_of_all_times) }} + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/end_of_all_times_date.sql b/macros/supporting/end_of_all_times_date.sql index 6c810323..8ab809a2 100644 --- a/macros/supporting/end_of_all_times_date.sql +++ b/macros/supporting/end_of_all_times_date.sql @@ -172,5 +172,29 @@ {%- set end_of_all_times_date = "8888-12-31" -%} {%- endif -%} +{{ return(end_of_all_times_date) }} +{%- endmacro -%} + + +{%- macro databricks__end_of_all_times_date() %} + +{%- set global_var = var('datavault4dbt.end_of_all_times_date', none) -%} +{%- set end_of_all_times_date = '' -%} + +{%- if global_var is mapping -%} + {%- if 'databricks' in global_var.keys()|map('lower') -%} + {% set end_of_all_times_date = global_var['databricks'] %} + {%- else -%} + {%- if execute -%} + {%- do exceptions.warn("Warning: You have set the global variable 'datavault4dbt.end_of_all_times_date' to a dictionary, but have not included the adapter you use (databricks) as a key. Applying the default value.") -%} + {% endif %} + {%- set end_of_all_times_date = "8888-12-31" -%} + {% endif %} +{%- elif global_var is not mapping and datavault4dbt.is_something(global_var) -%} + {%- set end_of_all_times_date = global_var -%} +{%- else -%} + {%- set end_of_all_times_date = "8888-12-31" -%} +{%- endif -%} + {{ return(end_of_all_times_date) }} {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/ghost_record_per_datatype.sql b/macros/supporting/ghost_record_per_datatype.sql index 4715a0cf..2cbc9e70 100644 --- a/macros/supporting/ghost_record_per_datatype.sql +++ b/macros/supporting/ghost_record_per_datatype.sql @@ -522,4 +522,54 @@ {%- endif %} {%- endif -%} +{%- endmacro -%} + + +{%- macro databricks__ghost_record_per_datatype(column_name, datatype, ghost_record_type, col_size, alias) -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set beginning_of_all_times_date = datavault4dbt.beginning_of_all_times_date() -%} +{%- set end_of_all_times_date = datavault4dbt.end_of_all_times_date() -%} +{%- set date_format = datavault4dbt.date_format() -%} + +{%- set unknown_value__STRING = var('datavault4dbt.unknown_value__STRING', '(unknown)') -%} +{%- set error_value__STRING = var('datavault4dbt.error_value__STRING', '(error)') -%} +{%- set unknown_value__numeric = var('datavault4dbt.unknown_value__numeric', -1) -%} +{%- set error_value__numeric = var('datavault4dbt.error_value__numeric', -2) -%} + +{%- set hash = datavault4dbt.hash_method() -%} +{%- set hash_default_values = datavault4dbt.hash_default_values(hash_function=hash) -%} +{%- set unknown_value__HASHTYPE = hash_default_values['unknown_key'] -%} +{%- set error_value__HASHTYPE = hash_default_values['error_key'] -%} + +{%- set datatype = datatype | string | upper | trim -%} + +{%- if ghost_record_type == 'unknown' -%} + {%- if datatype == 'TIMESTAMP' %} {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} as {{ alias }} + {%- elif datatype == 'DATE'%} TO_DATE('{{ beginning_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }} + {%- elif datatype == 'STRING' %} '{{unknown_value__STRING}}' as {{ alias }} + {%- elif datatype in ['INT', 'SMALLINT', 'TINYINT', 'BIGINT', 'DOUBLE', 'FLOAT'] %} CAST('{{unknown_value__numeric}}' as {{ datatype}}) as {{ alias }} + {%- elif datatype.upper().startswith('DECIMAL') %} CAST('{{unknown_value__numeric}}' as DECIMAL) as {{ alias }} + {%- elif datatype == 'BOOLEAN' %} CAST('FALSE' as BOOLEAN) as {{ alias }} + {%- elif datatype == 'BINARY' %} CAST('{{ unknown_value__HASHTYPE }}') as {{ alias }} + {%- else %} CAST(NULL as {{ datatype }}) as {{ alias }} + {% endif %} +{%- elif ghost_record_type == 'error' -%} + {%- if datatype == 'TIMESTAMP' %} {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} as {{ alias }} + {%- elif datatype == 'DATE'-%} TO_DATE('{{ end_of_all_times_date }}', '{{ date_format }}' ) as {{ alias }} + {%- elif datatype == 'STRING' %} '{{error_value__STRING}}' as {{ alias }} + {%- elif datatype in ['INT', 'SMALLINT', 'TINYINT', 'BIGINT', 'DOUBLE', 'FLOAT'] %} CAST('{{error_value__numeric}}' as {{ datatype}}) as {{ alias }} + {%- elif datatype.upper().startswith('DECIMAL') %} CAST('{{error_value__numeric}}' as DECIMAL) as {{ alias }} + {%- elif datatype == 'BOOLEAN' %} CAST('FALSE' as BOOLEAN) as {{ alias }} + {%- elif datatype == 'BINARY' %} CAST('{{ error_value__HASHTYPE }}') as {{ alias }} + {%- else %} CAST(NULL as {{ datatype }}) as {{ alias }} + {% endif %} +{%- else -%} + {%- if execute -%} + {{ exceptions.raise_compiler_error("Invalid Ghost Record Type. Accepted are 'unknown' and 'error'.") }} + {%- endif %} +{%- endif -%} {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/hash.sql b/macros/supporting/hash.sql index 799f09fa..c6ae74b3 100644 --- a/macros/supporting/hash.sql +++ b/macros/supporting/hash.sql @@ -439,4 +439,73 @@ {%- endfor -%} +{%- endmacro -%} + + +{%- macro databricks__hash(columns, alias, is_hashdiff, multi_active_key, main_hashkey_column) -%} + +{%- set hash = datavault4dbt.hash_method() -%} +{%- set concat_string = var('concat_string', '||') -%} +{%- set quote = var('quote', '"') -%} +{%- set null_placeholder_string = var('null_placeholder_string', '^^') -%} + +{%- set hashkey_input_case_sensitive = var('datavault4dbt.hashkey_input_case_sensitive', FALSE) -%} +{%- set hashdiff_input_case_sensitive = var('datavault4dbt.hashdiff_input_case_sensitive', TRUE) -%} + +{#- Select hashing algorithm -#} +{%- set hash_dtype = var('datavault4dbt.hash_datatype', 'STRING') -%} +{{ log('hash type in hash macro: ' ~ hash_dtype, false) }} +{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash,hash_datatype=hash_dtype)) -%} +{%- set hash_alg = hash_default_values['hash_alg'] -%} +{%- set unknown_key = hash_default_values['unknown_key'] -%} +{%- set error_key = hash_default_values['error_key'] -%} + +{%- set attribute_standardise = datavault4dbt.attribute_standardise() %} + + +{#- If single column to hash -#} +{%- if columns is string -%} + {%- set columns = [columns] -%} +{%- endif -%} + +{%- set all_null = [] -%} + +{%- if is_hashdiff and datavault4dbt.is_something(multi_active_key) -%} + {%- set std_dict = fromjson(datavault4dbt.multi_active_concattenated_standardise(case_sensitive=hashdiff_input_case_sensitive, hash_alg=hash_alg, datatype=hash_dtype, alias=alias, zero_key=unknown_key, multi_active_key=multi_active_key, main_hashkey_column=main_hashkey_column)) -%} +{%- elif is_hashdiff -%} + {%- set std_dict = fromjson(datavault4dbt.concattenated_standardise(case_sensitive=hashdiff_input_case_sensitive, hash_alg=hash_alg, datatype=hash_dtype, alias=alias, zero_key=unknown_key)) -%} +{%- else -%} + {%- set std_dict = fromjson(datavault4dbt.concattenated_standardise(case_sensitive=hashkey_input_case_sensitive, hash_alg=hash_alg, datatype=hash_dtype, alias=alias, zero_key=unknown_key)) -%} +{%- endif -%} + + {%- set standardise_prefix = std_dict['standardise_prefix'] -%} + {%- set standardise_suffix = std_dict['standardise_suffix'] -%} + +{{ standardise_prefix }} + +{%- for column in columns -%} + + {%- do all_null.append(null_placeholder_string) -%} + + {%- if '.' in column %} + {% set column_str = column -%} + {%- else -%} + {%- set column_str = datavault4dbt.as_constant(column) -%} + {%- endif -%} + + {{- "\nIFNULL(({}), '{}')".format(attribute_standardise | replace('[EXPRESSION]', column_str) | replace('[QUOTE]', quote) | replace('[NULL_PLACEHOLDER_STRING]', null_placeholder_string), null_placeholder_string) | indent(4) -}} + {{- ",'{}',".format(concat_string) if not loop.last -}} + + {%- if loop.last -%} + + {{ standardise_suffix | replace('[ALL_NULL]', all_null | join("")) | indent(4) }} + + {%- else -%} + + {%- do all_null.append(concat_string) -%} + + {%- endif -%} + +{%- endfor -%} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/hash_standardization.sql b/macros/supporting/hash_standardization.sql index 41d00720..74dc9cf0 100644 --- a/macros/supporting/hash_standardization.sql +++ b/macros/supporting/hash_standardization.sql @@ -75,6 +75,12 @@ CONCAT('\"', REPLACE(REPLACE(REPLACE(TRIM(CAST([EXPRESSION] AS STRING)), '\\', ' {%- endmacro -%} +{%- macro databricks__attribute_standardise(hash_type) -%} + +CONCAT('\"', REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(TRIM(CAST([EXPRESSION] AS STRING)), r'\\', r'\\\\'), '[QUOTE]', '\"'), '[NULL_PLACEHOLDER_STRING]', '--'), '\"') + +{%- endmacro -%} + {%- macro concattenated_standardise(case_sensitive, hash_alg, datatype, zero_key, alias) -%} @@ -409,8 +415,50 @@ CONCAT('\"', REPLACE(REPLACE(REPLACE(TRIM(CAST([EXPRESSION] AS STRING)), '\\', ' {{ return(dict_result | tojson ) }} -{%- endmacro -%} +{%- endmacro -%} + + +{%- macro databricks__concattenated_standardise(case_sensitive, hash_alg, datatype, zero_key, alias) -%} + +{%- set dict_result = {} -%} + +{%- set zero_key = datavault4dbt.as_constant(column_str=zero_key) -%} + +{%- if datatype == 'STRING' -%} + + {%- if case_sensitive -%} + {%- set standardise_prefix = "IFNULL(LOWER({}(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(UPPER(CONCAT(".format(hash_alg)-%} + {%- if alias is not none -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))), {}) AS {}".format(zero_key, alias)-%} + {%- else -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]')))), {})".format(zero_key)-%} + {%- endif -%} + {%- else -%} + {%- set standardise_prefix = "IFNULL(LOWER({}(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(CONCAT(".format(hash_alg)-%} + {%- set standardise_suffix = "\n), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))), {}) AS {}".format(zero_key, alias)-%} + {%- endif -%} + +{%- else -%} + {%- if case_sensitive -%} + {%- set standardise_prefix = "IFNULL({}(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(UPPER(CONCAT(".format(hash_alg)-%} + {%- if alias is not none -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]')), CAST({} AS {})) AS {}".format(zero_key, datatype, alias)-%} + {%- else -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]')), CAST({} AS {}))".format(zero_key, datatype)-%} + {%- endif -%} + {%- else -%} + {%- set standardise_prefix = "IFNULL({}(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(CONCAT(".format(hash_alg)-%} + {%- set standardise_suffix = "\n), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]')), CAST({} AS {})) AS {}".format(zero_key, datatype, alias)-%} + {%- endif -%} + +{%- endif -%} + +{%- do dict_result.update({"standardise_suffix": standardise_suffix, "standardise_prefix": standardise_prefix }) -%} + +{{ return(dict_result | tojson ) }} + +{%- endmacro -%} @@ -822,4 +870,62 @@ CONCAT('\"', REPLACE(REPLACE(REPLACE(TRIM(CAST([EXPRESSION] AS STRING)), '\\', ' {{ return(dict_result | tojson ) }} +{%- endmacro -%} + + +{%- macro databricks__multi_active_concattenated_standardise(case_sensitive, hash_alg, datatype, zero_key, alias, multi_active_key, main_hashkey_column) -%} +{%- set dict_result = {} -%} + +{%- set zero_key = datavault4dbt.as_constant(column_str=zero_key) -%} + +{%- if datavault4dbt.is_list(multi_active_key) -%} + {%- set multi_active_key = multi_active_key|join(", ") -%} +{%- endif -%} + +{%- if datatype == 'STRING' -%} + + {%- if case_sensitive -%} + {%- set standardise_prefix = "IFNULL(LOWER({}(ARRAY_JOIN(SORT_ARRAY(ARRAY_AGG(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(UPPER(CONCAT(".format(hash_alg)-%} + + {%- if alias is not none -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),','))), {}) AS {}".format(zero_key, alias)-%} + {%- else -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),','))), {})".format(zero_key)-%} + {%- endif -%} + {%- else -%} + {%- set standardise_prefix = "IFNULL(LOWER({}(ARRAY_JOIN(SORT_ARRAY(ARRAY_AGG(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(CONCAT(".format(hash_alg) -%} + + {%- if alias is not none -%} + {%- set standardise_suffix = "\n), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),','))), {}) AS {}".format(zero_key, alias)-%} + {%- else -%} + {%- set standardise_suffix = "\n), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),','))), {})".format(zero_key)-%} + {%- endif -%} + {%- endif -%} + +{%- else -%} + + {%- if case_sensitive -%} + {%- set standardise_prefix = "IFNULL({}(ARRAY_JOIN(SORT_ARRAY(ARRAY_AGG(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(UPPER(CONCAT(".format(hash_alg)-%} + + {%- if alias is not none -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),',')), {}) AS {}".format(zero_key, alias)-%} + {%- else -%} + {%- set standardise_suffix = "\n)), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),',')), {})".format(zero_key)-%} + {%- endif -%} + {%- else -%} + {%- set standardise_prefix = "IFNULL({}(ARRAY_JOIN(SORT_ARRAY(ARRAY_AGG(NULLIF(CAST(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(REGEXP_REPLACE(CONCAT(".format(hash_alg)-%} + + {%- if alias is not none -%} + {%- set standardise_suffix = "\n), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),',')), {}) AS {}".format(zero_key, alias)-%} + {%- else -%} + {%- set standardise_suffix = "\n), r'\\n', '') \n, r'\\t', '') \n, r'\\v', '') \n, r'\\r', '') AS STRING), '[ALL_NULL]'))),',')), {})".format(zero_key)-%} + {%- endif -%} + {%- endif -%} + +{%- endif -%} + +{%- do dict_result.update({"standardise_suffix": standardise_suffix, "standardise_prefix": standardise_prefix }) -%} + +{{ return(dict_result | tojson ) }} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/string_default_dtype.sql b/macros/supporting/string_default_dtype.sql index 4c2c3e3a..ecc44477 100644 --- a/macros/supporting/string_default_dtype.sql +++ b/macros/supporting/string_default_dtype.sql @@ -240,4 +240,38 @@ {{ return(string_default_dtype) }} +{%- endmacro -%} + + +{%- macro databricks__string_default_dtype(type) %} + +{%- if type == 'rsrc' %} + {%- set global_var = var('datavault4dbt.rsrc_default_dtype', none) -%} +{%- elif type == 'stg' %} + {%- set global_var = var('datavault4dbt.stg_default_dtype', none) -%} +{%- elif type == 'derived_columns' %} + {%- set global_var = var('datavault4dbt.derived_columns_default_dtype', none) -%} +{%- else %} + {%- set global_var = none %} +{%- endif %} + +{%- set string_default_dtype = '' -%} + +{%- if global_var is mapping -%} + {%- if 'databricks' in global_var.keys()|map('lower') -%} + {% set string_default_dtype = global_var['databricks'] %} + {%- else -%} + {%- if execute -%} + {%- do exceptions.warn("Warning: You have set the global variable 'datavault4dbt." ~ type ~ "_default_dtype' to a dictionary, but have not included the adapter you use (databricks) as a key. Applying the default value.") -%} + {% endif %} + {%- set string_default_dtype = "STRING" -%} + {% endif %} +{%- elif global_var is not mapping and datavault4dbt.is_something(global_var) -%} + {%- set string_default_dtype = global_var -%} +{%- else -%} + {%- set string_default_dtype = "STRING" -%} +{%- endif -%} + +{{ return(string_default_dtype) }} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/supporting/string_to_timestamp.sql b/macros/supporting/string_to_timestamp.sql index ad31b6a6..cf4b1498 100644 --- a/macros/supporting/string_to_timestamp.sql +++ b/macros/supporting/string_to_timestamp.sql @@ -29,4 +29,8 @@ {%- macro fabric__string_to_timestamp(format, timestamp) -%} CONVERT(datetime2(6), '{{ timestamp }}', {{ format }}) -{%- endmacro -%} \ No newline at end of file +{%- endmacro -%} + +{%- macro databricks__string_to_timestamp(format, timestamp) -%} + TO_TIMESTAMP('{{ timestamp }}', '{{ format }}') +{%- endmacro -%} diff --git a/macros/supporting/timestamp_format.sql b/macros/supporting/timestamp_format.sql index 0a3b7308..76a843be 100644 --- a/macros/supporting/timestamp_format.sql +++ b/macros/supporting/timestamp_format.sql @@ -176,4 +176,29 @@ {{ return(timestamp_format) }} +{%- endmacro -%} + + +{%- macro databricks__timestamp_format() %} + +{%- set global_var = var('datavault4dbt.timestamp_format', none) -%} +{%- set timestamp_format = '' -%} + +{%- if global_var is mapping -%} + {%- if 'databricks' in global_var.keys()|map('lower') -%} + {% set timestamp_format = global_var['databricks'] %} + {%- else -%} + {%- if execute -%} + {%- do exceptions.warn("Warning: You have set the global variable 'datavault4dbt.timestamp_format' to a dictionary, but have not included the adapter you use (databricks) as a key. Applying the default value.") -%} + {% endif %} + {%- set timestamp_format = "yyyy-MM-dd HH:mm:ss" -%} + {% endif %} +{%- elif global_var is not mapping and datavault4dbt.is_something(global_var) -%} + {%- set timestamp_format = global_var -%} +{%- else -%} + {%- set timestamp_format = "yyyy-MM-dd HH:mm:ss" -%} +{%- endif -%} + +{{ return(timestamp_format) }} + {%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/databricks/control_snap_v0.sql b/macros/tables/databricks/control_snap_v0.sql new file mode 100644 index 00000000..b2f8bb7c --- /dev/null +++ b/macros/tables/databricks/control_snap_v0.sql @@ -0,0 +1,79 @@ +{%- macro databricks__control_snap_v0(start_date, daily_snapshot_time, sdts_alias, end_date=none) -%} + +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- if not datavault4dbt.is_something(sdts_alias) -%} + {%- set sdts_alias = var('datavault4dbt.sdts_alias', 'sdts') -%} +{%- endif -%} + +WITH + +date_array as( + select sequence(to_timestamp('{{ start_date }} {{ daily_snapshot_time }}'), to_timestamp(current_date()+1), interval 1 day) AS sdts +), + +cte as( + select explode(sdts) as sdts + from date_array +), + +initial_timestamps AS ( + + SELECT * + FROM + cte + WHERE + sdts <= CURRENT_TIMESTAMP + {%- if is_incremental() %} + AND sdts > (SELECT MAX({{ sdts_alias }}) FROM {{ this }}) + {%- endif %} + +), + +enriched_timestamps AS ( + + SELECT + sdts as {{ sdts_alias }}, + TRUE as force_active, + sdts as replacement_sdts, + CONCAT("Snapshot ", DATE(sdts)) as caption, + CASE + WHEN EXTRACT(MINUTE FROM sdts) = 0 AND EXTRACT(SECOND FROM sdts) = 0 THEN TRUE + ELSE FALSE + END as is_hourly, + CASE + WHEN EXTRACT(MINUTE FROM sdts) = 0 AND EXTRACT(SECOND FROM sdts) = 0 AND EXTRACT(HOUR FROM sdts) = 0 THEN TRUE + ELSE FALSE + END as is_daily, + CASE + WHEN EXTRACT(DAYOFWEEK FROM sdts) = 2 THEN TRUE + ELSE FALSE + END as is_weekly, + CASE + WHEN EXTRACT(DAY FROM sdts) = 1 THEN TRUE + ELSE FALSE + END as is_monthly, + CASE + WHEN LAST_DAY(DATE(sdts)) = DATE(sdts) THEN TRUE + ELSE FALSE + END as is_end_of_month, + CASE + WHEN EXTRACT(DAY FROM sdts) = 1 AND EXTRACT(MONTH from sdts) IN (1,4,7,10) THEN TRUE + ELSE FALSE + END AS is_quarterly, + CASE + WHEN EXTRACT(DAY FROM sdts) = 1 AND EXTRACT(MONTH FROM sdts) = 1 THEN TRUE + ELSE FALSE + END as is_yearly, + CASE + WHEN LAST_DAY(DATE(sdts)) = DATE(sdts) AND EXTRACT (MONTH FROM sdts) = 12 THEN TRUE + ELSE FALSE + END AS is_end_of_year, + '' as comment + FROM initial_timestamps + +) + +SELECT * FROM enriched_timestamps + +{%- endmacro -%} diff --git a/macros/tables/databricks/control_snap_v1.sql b/macros/tables/databricks/control_snap_v1.sql new file mode 100644 index 00000000..b5c41a57 --- /dev/null +++ b/macros/tables/databricks/control_snap_v1.sql @@ -0,0 +1,183 @@ +{%- macro databricks__control_snap_v1(control_snap_v0, log_logic, sdts_alias) -%} + +{# Sample intervals + {%-set log_logic = {'daily': {'duration': 3, + 'unit': 'MONTH', + 'forever': 'FALSE'}, + 'weekly': {'duration': 1, + 'unit': 'YEAR'}, + 'monthly': {'duration': 5, + 'unit': 'YEAR'}, + 'yearly': {'forever': 'TRUE'} } %} +#} + +{%- if log_logic is not none %} + {%- for interval in log_logic.keys() %} + {%- if 'forever' not in log_logic[interval].keys() -%} + {% do log_logic[interval].update({'forever': 'FALSE'}) %} + {%- endif -%} + {%- endfor -%} +{%- endif %} + +{%- set v0_relation = ref(control_snap_v0) -%} +{%- set ns = namespace(forever_status=FALSE) %} + +{%- set snapshot_trigger_column = var('datavault4dbt.snapshot_trigger_column', 'is_active') -%} + +WITH + +latest_row AS ( + + SELECT + {{ sdts_alias }} + FROM {{ v0_relation }} + ORDER BY {{ sdts_alias }} DESC + LIMIT 1 + +), + +virtual_logic AS ( + + SELECT + c.{{ sdts_alias }}, + c.replacement_sdts, + c.force_active, + {%- if log_logic is none %} + TRUE as {{ snapshot_trigger_column }}, + {%- else %} + CASE + WHEN + {% if 'daily' in log_logic.keys() %} + {%- if log_logic['daily']['forever'] is true -%} + {%- set ns.forever_status = 'TRUE' -%} + (1=1) + {%- else %} + + {%- set daily_duration = log_logic['daily']['duration'] -%} + {%- set daily_unit = log_logic['daily']['unit'] -%} + + (c.{{ sdts_alias }} BETWEEN TRY_SUBTRACT(CURRENT_TIMESTAMP(), INTERVAL '{{ daily_duration }}' {{ daily_unit }}) AND CURRENT_TIMESTAMP()) + {%- endif -%} + {%- endif %} + + {%- if 'weekly' in log_logic.keys() %} + OR + {%- if log_logic['weekly']['forever'] is true -%} + {%- set ns.forever_status = 'TRUE' -%} + (c.is_weekly = TRUE) + {%- else %} + + {%- set weekly_duration = log_logic['weekly']['duration'] -%} + {%- set weekly_unit = log_logic['weekly']['unit'] -%} + + ( + (c.{{ sdts_alias }} BETWEEN TRY_SUBTRACT(CURRENT_TIMESTAMP(), INTERVAL '{{ weekly_duration }}' {{ weekly_unit }}) AND CURRENT_TIMESTAMP() ) + AND + (c.is_weekly = TRUE) + ) + {%- endif -%} + {% endif -%} + + {%- if 'monthly' in log_logic.keys() %} + OR + {%- if log_logic['monthly']['forever'] is true -%} + {%- set ns.forever_status = 'TRUE' -%} + (c.is_monthly = TRUE) + {%- else %} + + {%- set monthly_duration = log_logic['monthly']['duration'] -%} + {%- set monthly_unit = log_logic['monthly']['unit'] -%} + + ( + (c.{{ sdts_alias }} BETWEEN TRY_SUBTRACT(CURRENT_TIMESTAMP(), INTERVAL '{{ monthly_duration }}' {{ monthly_unit }}) AND CURRENT_TIMESTAMP() ) + AND + (c.is_monthly = TRUE) + ) + {%- endif -%} + {% endif -%} + + {%- if 'yearly' in log_logic.keys() %} + OR + {%- if log_logic['yearly']['forever'] is true -%} + {%- set ns.forever_status = 'TRUE' -%} + (c.is_yearly = TRUE) + {%- else %} + + {%- set yearly_duration = log_logic['yearly']['duration'] -%} + {%- set yearly_unit = log_logic['yearly']['unit'] -%} + + ( + (c.{{ sdts_alias }} BETWEEN TRY_SUBTRACT(CURRENT_TIMESTAMP(), INTERVAL '{{ yearly_duration }}' {{ yearly_unit }}) AND CURRENT_TIMESTAMP() ) + AND + (c.is_yearly = TRUE) + ) + {%- endif -%} + {% endif %} + THEN TRUE + ELSE FALSE + + END AS {{ snapshot_trigger_column }}, + {%- endif %} + + CASE + WHEN l.{{ sdts_alias }} IS NULL THEN FALSE + ELSE TRUE + END AS is_latest, + + c.caption, + c.is_hourly, + c.is_daily, + c.is_weekly, + c.is_monthly, + c.is_yearly, + CASE + WHEN EXTRACT(YEAR FROM c.{{ sdts_alias }}) = EXTRACT(YEAR FROM CURRENT_DATE()) THEN TRUE + ELSE FALSE + END AS is_current_year, + CASE + WHEN EXTRACT(YEAR FROM c.{{ sdts_alias }}) = EXTRACT(YEAR FROM CURRENT_DATE())-1 THEN TRUE + ELSE FALSE + END AS is_last_year, + CASE + WHEN c.{{ sdts_alias }} BETWEEN TRY_SUBTRACT(CURRENT_TIMESTAMP(), INTERVAL '1' YEAR) AND CURRENT_TIMESTAMP() THEN TRUE + ELSE FALSE + END AS is_rolling_year, + CASE + WHEN c.{{ sdts_alias }} BETWEEN TRY_SUBTRACT(CURRENT_TIMESTAMP(), INTERVAL '2' YEAR) AND TRY_SUBTRACT(CURRENT_TIMESTAMP(), INTERVAL '1' YEAR) THEN TRUE + ELSE FALSE + END AS is_last_rolling_year, + c.comment + FROM {{ v0_relation }} c + LEFT JOIN latest_row l + ON c.{{ sdts_alias }} = l.{{ sdts_alias }} + +), + +active_logic_combined AS ( + + SELECT + {{ sdts_alias }}, + replacement_sdts, + CASE + WHEN force_active AND {{ snapshot_trigger_column }} THEN TRUE + WHEN NOT force_active OR NOT {{ snapshot_trigger_column }} THEN FALSE + END AS {{ snapshot_trigger_column }}, + is_latest, + caption, + is_hourly, + is_daily, + is_weekly, + is_monthly, + is_yearly, + is_current_year, + is_last_year, + is_rolling_year, + is_last_rolling_year, + comment + FROM virtual_logic + +) + +SELECT * FROM active_logic_combined + +{%- endmacro -%} diff --git a/macros/tables/databricks/hub.sql b/macros/tables/databricks/hub.sql new file mode 100644 index 00000000..f0555b15 --- /dev/null +++ b/macros/tables/databricks/hub.sql @@ -0,0 +1,236 @@ +{%- macro databricks__hub(hashkey, business_keys, src_ldts, src_rsrc, source_models, disable_hwm) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{{ log('source_models'~source_models, false) }} + +{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%} + +{# Select the Business Key column from the first source model definition provided in the hub model and put them in an array. #} +{%- set business_keys = datavault4dbt.expand_column_list(columns=[business_keys]) -%} + +{# If no specific bk_columns is defined for each source, we apply the values set in the business_keys variable. #} +{# If no specific hk_column is defined for each source, we apply the values set in the hashkey variable. #} +{# If no rsrc_static parameter is defined in ANY of the source models then the whole code block of record_source performance lookup is not executed #} +{# For the use of record_source performance lookup it is required that every source model has the parameter rsrc_static defined and it cannot be an empty string #} +{%- if source_models is not mapping and not datavault4dbt.is_list(source_models) -%} + {%- set source_models = {source_models: {}} -%} +{%- endif -%} + +{%- set source_model_values = fromjson(datavault4dbt.source_model_processing(source_models=source_models, parameters={'hk_column':hashkey}, business_keys=business_keys)) -%} +{%- set source_models = source_model_values['source_model_list'] -%} +{%- set ns.has_rsrc_static_defined = source_model_values['has_rsrc_static_defined'] -%} +{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%} +{{ log('source_models: '~source_models, false) }} + +{%- set final_columns_to_select = [hashkey] + business_keys + [src_ldts] + [src_rsrc] -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{% if is_incremental() -%} +{# Get all target hashkeys out of the existing hub for later incremental logic. #} + distinct_target_hashkeys AS ( + + SELECT + {{ hashkey }} + FROM {{ this }} + + ), + {%- if ns.has_rsrc_static_defined and not disable_hwm -%} + {% for source_model in source_models %} + {# Create a query with a rsrc_static column with each rsrc_static for each source model. #} + {%- set source_number = source_model.id | string -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number] -%} + + {{log('rsrc_statics: '~ rsrc_statics, false) }} + + {%- set rsrc_static_query_source -%} + SELECT count(*) FROM ( + {%- for rsrc_static in rsrc_statics -%} + SELECT t.{{ src_rsrc }}, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + ) + {% endset %} + + {{ log('rsrc static query: '~rsrc_static_query_source, false) }} + + rsrc_static_{{ source_number }} AS ( + {%- for rsrc_static in rsrc_statics -%} + SELECT + t.*, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + {%- set ns.last_cte = "rsrc_static_{}".format(source_number) -%} + ), + + {%- set source_in_target = true -%} + + {%- if execute -%} + {%- set rsrc_static_result = run_query(rsrc_static_query_source) -%} + + {%- set row_count = rsrc_static_result.columns[0].values()[0] -%} + + {{ log('row_count for '~source_model~' is '~row_count, false) }} + + {%- if row_count == 0 -%} + {%- set source_in_target = false -%} + {%- endif -%} + {%- endif -%} + + + {%- do ns.source_included_before.update({source_model.id: source_in_target}) -%} + + {% endfor -%} + + {%- if source_models | length > 1 %} + + rsrc_static_union AS ( + {# Create one unionized table over all sources. It will be the same as the already existing + hub, but extended by the rsrc_static column. #} + {% for source_model in source_models %} + {%- set source_number = source_model.id | string -%} + + SELECT rsrc_static_{{ source_number }}.* FROM rsrc_static_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor %} + {%- set ns.last_cte = "rsrc_static_union" -%} + ), + + {%- endif %} + + max_ldts_per_rsrc_static_in_target AS ( + {# Use the previously created CTE to calculate the max load date timestamp per rsrc_static. #} + SELECT + rsrc_static, + MAX({{ src_ldts }}) as max_ldts + FROM {{ ns.last_cte }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + GROUP BY rsrc_static + + ), + {%- endif %} +{% endif -%} + +{% for source_model in source_models %} + + {%- set source_number = source_model.id | string -%} + + {%- if ns.has_rsrc_static_defined -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number|string] -%} + {%- endif -%} + + {%- if 'hk_column' not in source_model.keys() %} + {%- set hk_column = hashkey -%} + {%- else -%} + {%- set hk_column = source_model['hk_column'] -%} + {% endif %} + + src_new_{{ source_number }} AS ( + + SELECT + {{ hk_column }} AS {{ hashkey }}, + {% for bk in source_model['bk_columns'] -%} + {{ bk }} AS {{ business_keys[loop.index - 1] }}, + {% endfor -%} + + {{ src_ldts }}, + {{ src_rsrc }} + FROM {{ ref(source_model.name) }} src + {{ log('rsrc_statics defined?: ' ~ ns.source_models_rsrc_dict[source_number|string], false) }} + + {%- if is_incremental() and ns.has_rsrc_static_defined and ns.source_included_before[source_number|int] and not disable_hwm %} + INNER JOIN max_ldts_per_rsrc_static_in_target max ON + ({%- for rsrc_static in rsrc_statics -%} + max.rsrc_static = '{{ rsrc_static }}' + {%- if not loop.last -%} OR + {% endif -%} + {%- endfor %}) + WHERE src.{{ src_ldts }} > max.max_ldts + {%- elif is_incremental() and source_models | length == 1 and not ns.has_rsrc_static_defined and not disable_hwm %} + WHERE src.{{ src_ldts }} > ( + SELECT MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} + + {%- set ns.last_cte = "src_new_{}".format(source_number) %} + + ), +{%- endfor -%} + +{%- if source_models | length > 1 %} + +source_new_union AS ( + + {%- for source_model in source_models -%} + + {%- set source_number = source_model.id | string -%} + + SELECT + {{ hashkey }}, + + {% for bk in source_model['bk_columns'] -%} + {{ business_keys[loop.index - 1] }}, + {% endfor -%} + + {{ src_ldts }}, + {{ src_rsrc }} + FROM src_new_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + + {%- endfor -%} + + {%- set ns.last_cte = 'source_new_union' -%} + +), + +{%- endif %} + +earliest_hk_over_all_sources AS ( + + {#- Deduplicate the unionized records again to only insert the earliest one. #} + SELECT + lcte.* + FROM {{ ns.last_cte }} AS lcte + + QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts }}) = 1 + + {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%} + +), + +records_to_insert AS ( + {#- Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the hub. #} + SELECT + {{ datavault4dbt.print_list(final_columns_to_select) }} + FROM {{ ns.last_cte }} + + {%- if is_incremental() %} + WHERE {{ hashkey }} NOT IN (SELECT * FROM distinct_target_hashkeys) + {% endif -%} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/link.sql b/macros/tables/databricks/link.sql new file mode 100644 index 00000000..ecb01383 --- /dev/null +++ b/macros/tables/databricks/link.sql @@ -0,0 +1,240 @@ + +{%- macro databricks__link(link_hashkey, foreign_hashkeys, source_models, src_ldts, src_rsrc, disable_hwm) -%} + +{%- if not (foreign_hashkeys is iterable and foreign_hashkeys is not string) -%} + + {%- if execute -%} + {{ exceptions.raise_compiler_error("Only one foreign key provided for this link. At least two required.") }} + {%- endif %} + +{%- endif -%} + +{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{# If no specific link_hk and fk_columns are defined for each source, we apply the values set in the link_hashkey and foreign_hashkeys variable. #} +{# If no rsrc_static parameter is defined in ANY of the source models then the whole code block of record_source performance lookup is not executed #} +{# For the use of record_source performance lookup it is required that every source model has the parameter rsrc_static defined and it cannot be an empty string #} +{%- if source_models is not mapping and not datavault4dbt.is_list(source_models) -%} + {%- set source_models = {source_models: {}} -%} +{%- endif -%} + +{%- set source_model_values = fromjson(datavault4dbt.source_model_processing(source_models=source_models, parameters={'link_hk':link_hashkey}, foreign_hashkeys=foreign_hashkeys)) -%} +{%- set source_models = source_model_values['source_model_list'] -%} +{%- set ns.has_rsrc_static_defined = source_model_values['has_rsrc_static_defined'] -%} +{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%} +{{ log('source_models: '~source_models, false) }} + +{%- set final_columns_to_select = [link_hashkey] + foreign_hashkeys + [src_ldts] + [src_rsrc] -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{% if is_incremental() %} +{# Get all link hashkeys out of the existing link for later incremental logic. #} + distinct_target_hashkeys AS ( + + SELECT + {{ link_hashkey }} + FROM {{ this }} + + ), + {%- if ns.has_rsrc_static_defined and not disable_hwm -%} + {% for source_model in source_models %} + {# Create a query with a rsrc_static column with each rsrc_static for each source model. #} + {%- set source_number = source_model.id | string -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number] -%} + + {{log('rsrc_statics: '~ rsrc_statics, false) }} + + {%- set rsrc_static_query_source -%} + SELECT count(*) FROM ( + {%- for rsrc_static in rsrc_statics -%} + SELECT t.{{ src_rsrc }}, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + ) + {% endset %} + + rsrc_static_{{ source_number }} AS ( + {%- for rsrc_static in rsrc_statics -%} + SELECT t.*, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + {%- set ns.last_cte = "rsrc_static_{}".format(source_number) -%} + ), + + {%- set source_in_target = true -%} + + {%- if execute -%} + {%- set rsrc_static_result = run_query(rsrc_static_query_source) -%} + + {%- set row_count = rsrc_static_result.columns[0].values()[0] -%} + + {{ log('row_count for '~source_model~' is '~row_count, false) }} + + {%- if row_count == 0 -%} + {%- set source_in_target = false -%} + {%- endif -%} + {%- endif -%} + + + {%- do ns.source_included_before.update({source_model.id: source_in_target}) -%} + + {% endfor -%} + + {%- if source_models | length > 1 %} + + rsrc_static_union AS ( + {# Create one unionized table over all sources. It will be the same as the already existing + link, but extended by the rsrc_static column. #} + + {% for source_model in source_models %} + {%- set source_number = source_model.id | string -%} + + SELECT rsrc_static_{{ source_number }}.* FROM rsrc_static_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor %} + {%- set ns.last_cte = "rsrc_static_union" -%} + ), + + {%- endif %} + + max_ldts_per_rsrc_static_in_target AS ( + {# Use the previously created CTE to calculate the max load date timestamp per rsrc_static. #} + + SELECT + rsrc_static, + MAX({{ src_ldts }}) as max_ldts + FROM {{ ns.last_cte }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + GROUP BY rsrc_static + + ), + {%- endif %} +{% endif -%} + +{% for source_model in source_models %} + +{# Select all deduplicated records from each source, and filter for records that are newer + than the max ldts inside the existing link, if incremental. #} + + {%- set source_number = source_model.id | string -%} + + {%- if ns.has_rsrc_static_defined -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number|string] -%} + {%- endif -%} + + {%- if 'link_hk' not in source_model.keys() %} + {%- set link_hk = link_hashkey -%} + {%- else -%} + {%- set link_hk = source_model['link_hk'] -%} + {% endif %} + + src_new_{{ source_number }} AS ( + + SELECT + {{ link_hk }} AS {{ link_hashkey }}, + {% for fk in source_model['fk_columns'] -%} + {{ fk }}, + {% endfor -%} + {{ src_ldts }}, + {{ src_rsrc }} + FROM {{ ref(source_model.name) }} src + {{ log('rsrc_statics defined?: ' ~ ns.source_models_rsrc_dict[source_number|string], false) }} + + {%- if is_incremental() and ns.has_rsrc_static_defined and ns.source_included_before[source_number|int] and not disable_hwm %} + INNER JOIN max_ldts_per_rsrc_static_in_target max ON + ({%- for rsrc_static in rsrc_statics -%} + max.rsrc_static = '{{ rsrc_static }}' + {%- if not loop.last -%} OR + {% endif -%} + {%- endfor %}) + WHERE src.{{ src_ldts }} > max.max_ldts + {%- elif is_incremental() and source_models | length == 1 and not ns.has_rsrc_static_defined and not disable_hwm %} + WHERE src.{{ src_ldts }} > ( + SELECT MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} + + {%- set ns.last_cte = "src_new_{}".format(source_number) %} + + ), +{%- endfor -%} + +{%- if source_models | length > 1 %} + +source_new_union AS ( +{# Unionize the new records from all sources. #} + + {%- for source_model in source_models -%} + + {%- set source_number = source_model.id | string -%} + + SELECT + {{ link_hashkey }}, + {% for fk in source_model['fk_columns']|list %} + {{ fk }} AS {{ foreign_hashkeys[loop.index - 1] }}, + {% endfor -%} + {{ src_ldts }}, + {{ src_rsrc }} + FROM src_new_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + + {%- endfor -%} + + {%- set ns.last_cte = 'source_new_union' -%} + +), + +{%- endif %} + +earliest_hk_over_all_sources AS ( + {# Deduplicate the unionized records again to only insert the earliest one. #} + + SELECT + lcte.* + FROM {{ ns.last_cte }} AS lcte + + QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1 + + {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%} + +), + +records_to_insert AS ( + {# Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the link. #} + + SELECT + {{ datavault4dbt.print_list(final_columns_to_select) | indent(4) }} + FROM {{ ns.last_cte }} + + {%- if is_incremental() %} + WHERE {{ link_hashkey }} NOT IN (SELECT * FROM distinct_target_hashkeys) + {% endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/ma_sat_v0.sql b/macros/tables/databricks/ma_sat_v0.sql new file mode 100644 index 00000000..7ca5c360 --- /dev/null +++ b/macros/tables/databricks/ma_sat_v0.sql @@ -0,0 +1,104 @@ +{%- macro databricks__ma_sat_v0(parent_hashkey, src_hashdiff, src_ma_key, src_payload, src_ldts, src_rsrc, source_model) -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns=namespace(src_hashdiff="", hdiff_alias="") %} +{%- if src_hashdiff is mapping and src_hashdiff is not none -%} + {% set ns.src_hashdiff = src_hashdiff["source_column"] %} + {% set ns.hdiff_alias = src_hashdiff["alias"] %} +{% else %} + {% set ns.src_hashdiff = src_hashdiff %} + {% set ns.hdiff_alias = src_hashdiff %} +{%- endif -%} + +{%- set source_cols = datavault4dbt.expand_column_list(columns=[src_rsrc, src_ldts, src_ma_key, src_payload]) -%} + +{%- set source_relation = ref(source_model) -%} + + +WITH + +{# Selecting all source data, that is newer than latest data in sat if incremental #} +source_data AS ( + + SELECT + {{ parent_hashkey }}, + {{ ns.src_hashdiff }} as {{ ns.hdiff_alias }}, + {{ datavault4dbt.print_list(source_cols) }} + FROM {{ source_relation }} + + {%- if is_incremental() %} + WHERE {{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} + +), + +{# Get the latest record for each parent hashkey in existing sat, if incremental. #} +{%- if is_incremental() %} +latest_entries_in_sat AS ( + + SELECT + {{ parent_hashkey }}, + {{ ns.hdiff_alias }} + FROM + {{ this }} + QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) = 1 +), +{%- endif %} + +{# Get a list of all distinct hashdiffs that exist for each parent_hashkey. #} +deduped_row_hashdiff AS ( + + SELECT + {{ parent_hashkey }}, + {{ src_ldts }}, + {{ ns.hdiff_alias }} + FROM source_data + QUALIFY CASE + WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END +), + +{# Dedupe the source data regarding non-delta groups. #} +deduped_rows AS ( + + SELECT + source_data.{{ parent_hashkey }}, + source_data.{{ ns.hdiff_alias }}, + {{ datavault4dbt.alias_all(columns=source_cols, prefix='source_data') }} + FROM source_data + INNER JOIN deduped_row_hashdiff + ON {{ datavault4dbt.multikey(parent_hashkey, prefix=['source_data', 'deduped_row_hashdiff'], condition='=') }} + AND {{ datavault4dbt.multikey(src_ldts, prefix=['source_data', 'deduped_row_hashdiff'], condition='=') }} + AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['source_data', 'deduped_row_hashdiff'], condition='=') }} + +), + +records_to_insert AS ( + + SELECT + deduped_rows.{{ parent_hashkey }}, + deduped_rows.{{ ns.hdiff_alias }}, + {{ datavault4dbt.alias_all(columns=source_cols, prefix='deduped_rows') }} + FROM deduped_rows + {%- if is_incremental() %} + WHERE NOT EXISTS ( + SELECT 1 + FROM latest_entries_in_sat + WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduped_rows'], condition='=') }} + AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduped_rows'], condition='=') }} + ) + {%- endif %} + + ) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/ma_sat_v1.sql b/macros/tables/databricks/ma_sat_v1.sql new file mode 100644 index 00000000..f6c8bbed --- /dev/null +++ b/macros/tables/databricks/ma_sat_v1.sql @@ -0,0 +1,75 @@ +{%- macro databricks__ma_sat_v1(sat_v0, hashkey, hashdiff, ma_attribute, src_ldts, src_rsrc, ledts_alias, add_is_current_flag) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set is_current_col_alias = var('datavault4dbt.is_current_col_alias', 'IS_CURRENT') -%} + +{%- set source_relation = ref(sat_v0) -%} +{%- set all_columns = datavault4dbt.source_columns(source_relation=source_relation) -%} +{%- set exclude = datavault4dbt.expand_column_list(columns=[hashkey, hashdiff, ma_attribute, src_ldts, src_rsrc]) -%} +{%- set ma_attributes = datavault4dbt.expand_column_list(columns=[ma_attribute]) -%} + + +{%- set source_columns_to_select = datavault4dbt.process_columns_to_select(all_columns, exclude) -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# Getting everything from the underlying v0 satellite. #} +source_satellite AS ( + + SELECT src.* + FROM {{ source_relation }} as src + +), + +{# Selecting all distinct loads per hashkey. #} +distinct_hk_ldts AS ( + + SELECT DISTINCT + {{ hashkey }}, + {{ src_ldts }} + FROM source_satellite + +), + +{# End-dating each ldts for each hashkey, based on earlier ldts per hashkey. #} +end_dated_loads AS ( + + SELECT + {{ hashkey }}, + {{ src_ldts }}, + COALESCE(LEAD(TRY_SUBTRACT({{ src_ldts }}, INTERVAL 1 MICROSECOND)) OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts }}),{{ datavault4dbt.string_to_timestamp(timestamp_format,end_of_all_times) }}) as {{ ledts_alias }} + FROM distinct_hk_ldts + +), + +{# End-date each source record, based on the end-date for each load. #} +end_dated_source AS ( + + SELECT + src.{{ hashkey }}, + src.{{ hashdiff }}, + src.{{ src_rsrc }}, + src.{{ src_ldts }}, + edl.{{ ledts_alias }}, + {%- if add_is_current_flag %} + CASE WHEN {{ ledts_alias }} = {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + THEN TRUE + ELSE FALSE + END AS {{ is_current_col_alias }}, + {% endif %} + {{- datavault4dbt.print_list(ma_attributes, indent=10, src_alias='src') }}, + {{- datavault4dbt.print_list(source_columns_to_select, indent=10, src_alias='src') }} + FROM source_satellite AS src + LEFT JOIN end_dated_loads edl + ON src.{{ hashkey }} = edl.{{ hashkey }} + AND src.{{ src_ldts }} = edl.{{ src_ldts }} + +) + +SELECT * FROM end_dated_source + +{%- endmacro -%} diff --git a/macros/tables/databricks/nh_link.sql b/macros/tables/databricks/nh_link.sql new file mode 100644 index 00000000..d423a3ed --- /dev/null +++ b/macros/tables/databricks/nh_link.sql @@ -0,0 +1,254 @@ +{%- macro databricks__nh_link(link_hashkey, foreign_hashkeys, payload, source_models, src_ldts, src_rsrc, disable_hwm, source_is_single_batch) -%} +{%- if not (foreign_hashkeys is iterable and foreign_hashkeys is not string) -%} + + {%- if execute -%} + {{ exceptions.raise_compiler_error("Only one foreign key provided for this link. At least two required.") }} + {%- endif %} + +{%- endif -%} +{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + + +{# If no specific link_hk, fk_columns, or payload are defined for each source, we apply the values set in the link_hashkey, foreign_hashkeys, and payload variable. #} +{# If no rsrc_static parameter is defined in ANY of the source models then the whole code block of record_source performance lookup is not executed #} +{# For the use of record_source performance lookup it is required that every source model has the parameter rsrc_static defined and it cannot be an empty string #} +{%- if source_models is not mapping and not datavault4dbt.is_list(source_models) -%} + {%- set source_models = {source_models: {}} -%} +{%- endif -%} + +{%- set source_model_values = fromjson(datavault4dbt.source_model_processing(source_models=source_models, parameters={'link_hk':link_hashkey}, foreign_hashkeys=foreign_hashkeys, payload=payload)) -%} +{%- set source_models = source_model_values['source_model_list'] -%} +{%- set ns.has_rsrc_static_defined = source_model_values['has_rsrc_static_defined'] -%} +{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%} +{{ log('source_models: '~source_models, false) }} + +{%- set final_columns_to_select = [link_hashkey] + foreign_hashkeys + [src_ldts] + [src_rsrc] + payload -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{%- if is_incremental() -%} +{# Get all link hashkeys out of the existing link for later incremental logic. #} + distinct_target_hashkeys AS ( + + SELECT + {{ link_hashkey }} + FROM {{ this }} + + ), + {%- if ns.has_rsrc_static_defined and not disable_hwm -%} + {% for source_model in source_models %} + {# Create a query with a rsrc_static column with each rsrc_static for each source model. #} + {%- set source_number = source_model.id | string -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number] -%} + + {{log('rsrc_statics: '~ rsrc_statics, false) }} + + {%- set rsrc_static_query_source -%} + SELECT count(*) FROM ( + {%- for rsrc_static in rsrc_statics -%} + SELECT t.{{ src_rsrc }}, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + ) + {% endset %} + + rsrc_static_{{ source_number }} AS ( + {%- for rsrc_static in rsrc_statics -%} + SELECT t.*, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + {%- set ns.last_cte = "rsrc_static_{}".format(source_number) -%} + ), + + {%- set source_in_target = true -%} + + {%- if execute -%} + {%- set rsrc_static_result = run_query(rsrc_static_query_source) -%} + + {%- set row_count = rsrc_static_result.columns[0].values()[0] -%} + + {{ log('row_count for '~source_model~' is '~row_count, false) }} + + {%- if row_count == 0 -%} + {%- set source_in_target = false -%} + {%- endif -%} + {%- endif -%} + + + {%- do ns.source_included_before.update({source_model.id: source_in_target}) -%} + + {% endfor -%} + + {%- if source_models | length > 1 %} + + rsrc_static_union AS ( + {# Create one unionized table over all sources. It will be the same as the already existing + nh_link, but extended by the rsrc_static column. #} + + {% for source_model in source_models %} + {%- set source_number = source_model.id | string -%} + + SELECT rsrc_static_{{ source_number }}.* FROM rsrc_static_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor %} + {%- set ns.last_cte = "rsrc_static_union" -%} + ), + + {%- endif %} + + max_ldts_per_rsrc_static_in_target AS ( + {# Use the previously created CTE to calculate the max load date timestamp per rsrc_static. #} + + SELECT + rsrc_static, + MAX({{ src_ldts }}) AS max_ldts + FROM {{ ns.last_cte }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + GROUP BY rsrc_static + + ), + {%- endif %} +{% endif -%} + +{% for source_model in source_models %} + +{# Select all deduplicated records from each source, and filter for records that are newer + than the max ldts inside the existing link, if incremental. #} + + {%- set source_number = source_model.id | string -%} + + {%- if ns.has_rsrc_static_defined -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number|string] -%} + {%- endif -%} + + {%- if 'link_hk' not in source_model.keys() %} + {%- set link_hk = link_hashkey -%} + {%- else -%} + {%- set link_hk = source_model['link_hk'] -%} + {% endif %} + +src_new_{{ source_number }} AS ( + + SELECT + {{ link_hk }} AS {{ link_hashkey }}, + {% for fk in source_model['fk_columns'] -%} + {{ fk }}, + {% endfor -%} + {{ src_ldts }}, + {{ src_rsrc }}, + + {{ datavault4dbt.print_list(source_model['payload']) | indent(3) }} + + FROM {{ ref(source_model.name) }} src + {# If the model is incremental and all sources has rsrc_static defined and valid and the source was already included before in the target transactional link #} + {# then an inner join is performed on the CTE for the maximum load date timestamp per record source static to get the records + that match any of the rsrc_static present in it #} + {# if there are records in the source with a newer load date time stamp than the ones present in the target, those will be selected to be inserted later #} + {%- if is_incremental() and ns.has_rsrc_static_defined and ns.source_included_before[source_number|int] and not disable_hwm %} + INNER JOIN max_ldts_per_rsrc_static_in_target max ON + ({%- for rsrc_static in rsrc_statics -%} + max.rsrc_static = '{{ rsrc_static }}' + {%- if not loop.last -%} OR + {% endif -%} + {%- endfor %}) + WHERE src.{{ src_ldts }} > max.max_ldts + {%- elif is_incremental() and source_models | length == 1 and not ns.has_rsrc_static_defined and not disable_hwm %} + WHERE src.{{ src_ldts }} > ( + SELECT MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} + + {%- set ns.last_cte = "src_new_{}".format(source_number) %} + + ), +{%- endfor -%} + +{%- if source_models | length > 1 %} + +source_new_union AS ( +{# Unionize the new records from all sources. #} + + {%- for source_model in source_models -%} + + {%- set source_number = source_model.id | string -%} + + SELECT + {{ link_hashkey }}, + {% for fk in source_model['fk_columns']|list %} + {{ fk }} AS {{ foreign_hashkeys[loop.index - 1] }}, + {% endfor -%} + + {{ src_ldts }}, + {{ src_rsrc }}, + + {% for col in source_model['payload']|list %} + {{ col }} AS {{ payload[loop.index - 1] }} + {%- if not loop.last %}, {%- endif %} + {% endfor -%} + + FROM src_new_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + + {%- endfor -%} + + {%- set ns.last_cte = 'source_new_union' -%} + +), + +{%- endif %} + +{%- if not source_is_single_batch %} + +earliest_hk_over_all_sources AS ( +{# Deduplicate the unionized records again to only insert the earliest one. #} + + SELECT + lcte.* + FROM {{ ns.last_cte }} AS lcte + + QUALIFY ROW_NUMBER() OVER (PARTITION BY {{ link_hashkey }} ORDER BY {{ src_ldts }}) = 1 + + {%- set ns.last_cte = 'earliest_hk_over_all_sources' -%} + +), + +{%- endif %} + +records_to_insert AS ( +{# Select everything from the previous CTE, if its incremental then filter for hashkeys that are not already in the link. #} + + SELECT + {{ datavault4dbt.print_list(final_columns_to_select) | indent(4) }} + FROM {{ ns.last_cte }} + + {%- if is_incremental() %} + WHERE {{ link_hashkey }} NOT IN (SELECT * FROM distinct_target_hashkeys) + {% endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/nh_sat.sql b/macros/tables/databricks/nh_sat.sql new file mode 100644 index 00000000..643c8a18 --- /dev/null +++ b/macros/tables/databricks/nh_sat.sql @@ -0,0 +1,68 @@ +{%- macro databricks__nh_sat(parent_hashkey, src_payload, src_ldts, src_rsrc, source_model, source_is_single_batch) -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set source_cols = datavault4dbt.expand_column_list(columns=[parent_hashkey, src_ldts, src_rsrc, src_payload]) -%} + +{%- set source_relation = ref(source_model) -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# Selecting all source data, that is newer than latest data in sat if incremental #} +source_data AS ( + + SELECT + {{ datavault4dbt.print_list(source_cols) }} + FROM {{ source_relation }} + + {%- if is_incremental() %} + WHERE {{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} + + {% if not source_is_single_batch -%} + + QUALIFY + ROW_NUMBER() OVER (PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) = 1 + + {%- endif %} + +), + +{% if is_incremental() -%} +{# Get distinct list of hashkeys inside the existing satellite, if incremental. #} +distinct_hashkeys AS ( + + SELECT DISTINCT + {{ parent_hashkey }} + FROM {{ this }} + + ), + +{%- endif %} + +{# + Select all records from the source. If incremental, insert only records, where the + hashkey is not already in the existing satellite. +#} +records_to_insert AS ( + + SELECT + {{ datavault4dbt.print_list(source_cols) }} + FROM source_data + {%- if is_incremental() %} + WHERE {{ parent_hashkey }} NOT IN (SELECT * FROM distinct_hashkeys) + {%- endif %} + + ) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/pit.sql b/macros/tables/databricks/pit.sql new file mode 100644 index 00000000..29601647 --- /dev/null +++ b/macros/tables/databricks/pit.sql @@ -0,0 +1,109 @@ +{%- macro databricks__pit(tracked_entity, hashkey, sat_names, ldts, ledts, sdts, snapshot_relation, dimension_key,snapshot_trigger_column=none, custom_rsrc=none, pit_type=none) -%} + +{%- set hash = datavault4dbt.hash_method() -%} +{%- set hash_dtype = var('datavault4dbt.hash_datatype', 'STRING') -%} +{%- set hash_default_values = fromjson(datavault4dbt.hash_default_values(hash_function=hash,hash_datatype=hash_dtype)) -%} +{%- set hash_alg = hash_default_values['hash_alg'] -%} +{%- set unknown_key = hash_default_values['unknown_key'] -%} +{%- set error_key = hash_default_values['error_key'] -%} + +{%- if hash_dtype == 'BYTES' -%} + {%- set hashkey_string = 'TO_HEX({})'.format(datavault4dbt.prefix([hashkey],'te')) -%} +{%- else -%} + {%- set hashkey_string = datavault4dbt.prefix([hashkey],'te') -%} +{%- endif -%} + +{%- set rsrc = var('datavault4dbt.rsrc_alias', 'rsrc') -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- if datavault4dbt.is_something(pit_type) -%} + {%- set hashed_cols = [pit_type, hashkey_string, datavault4dbt.prefix([sdts], 'snap')] -%} +{%- else -%} + {%- set hashed_cols = [hashkey_string, datavault4dbt.prefix([sdts], 'snap')] -%} +{%- endif -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{%- if is_incremental() %} + +existing_dimension_keys AS ( + + SELECT + {{ dimension_key }} + FROM {{ this }} + +), + +{%- endif %} + +pit_records AS ( + + SELECT + + {% if datavault4dbt.is_something(pit_type) -%} + {{ datavault4dbt.as_constant(pit_type) }} as type, + {%- endif %} + {% if datavault4dbt.is_something(custom_rsrc) -%} + '{{ custom_rsrc }}' as {{ rsrc }}, + {%- endif %} + {{ datavault4dbt.hash(columns=hashed_cols, + alias=dimension_key, + is_hashdiff=false) }} , + te.{{ hashkey }}, + snap.{{ sdts }}, + {% for satellite in sat_names %} + COALESCE({{ satellite }}.{{ hashkey }}, CAST({{ datavault4dbt.as_constant(column_str=unknown_key) }} as {{ hash_dtype }})) AS hk_{{ satellite }}, + COALESCE({{ satellite }}.{{ ldts }}, {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }}) AS {{ ldts }}_{{ satellite }} + {{- "," if not loop.last }} + {%- endfor %} + + FROM + {{ ref(tracked_entity) }} te + FULL OUTER JOIN + {{ ref(snapshot_relation) }} snap + {% if datavault4dbt.is_something(snapshot_trigger_column) -%} + ON snap.{{ snapshot_trigger_column }} = true + {% else -%} + ON 1=1 + {%- endif %} + {% for satellite in sat_names %} + {%- set sat_columns = datavault4dbt.source_columns(ref(satellite)) %} + {%- if ledts|string|lower in sat_columns|map('lower') %} + LEFT JOIN {{ ref(satellite) }} + {%- else %} + LEFT JOIN ( + SELECT + {{ hashkey }}, + {{ ldts }}, + COALESCE(LEAD(TIMESTAMP_SUB({{ ldts }}, INTERVAL 1 MICROSECOND)) OVER (PARTITION BY {{ hashkey }} ORDER BY {{ ldts }}),{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) AS {{ ledts }} + FROM {{ ref(satellite) }} + ) {{ satellite }} + {% endif %} + ON + {{ satellite }}.{{ hashkey}} = te.{{ hashkey }} + AND snap.{{ sdts }} BETWEEN {{ satellite }}.{{ ldts }} AND {{ satellite }}.{{ ledts }} + {% endfor %} + {% if datavault4dbt.is_something(snapshot_trigger_column) -%} + WHERE snap.{{ snapshot_trigger_column }} + {%- endif %} + +), + +records_to_insert AS ( + + SELECT DISTINCT * + FROM pit_records + {%- if is_incremental() %} + WHERE {{ dimension_key }} NOT IN (SELECT * FROM existing_dimension_keys) + {% endif -%} + +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/rec_track_sat.sql b/macros/tables/databricks/rec_track_sat.sql new file mode 100644 index 00000000..853e55ec --- /dev/null +++ b/macros/tables/databricks/rec_track_sat.sql @@ -0,0 +1,223 @@ +{%- macro databricks__rec_track_sat(tracked_hashkey, source_models, src_ldts, src_rsrc, src_stg, disable_hwm) -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{# Setting the unknown and error ghost record value for record source column #} +{%- set rsrc_unknown = var('datavault4dbt.default_unknown_rsrc', 'SYSTEM') -%} +{%- set rsrc_error = var('datavault4dbt.default_error_rsrc', 'ERROR') -%} + +{# Setting the rsrc and stg_alias default datatypes #} +{%- set rsrc_default_dtype = datavault4dbt.string_default_dtype(type=rsrc) -%} +{%- set stg_default_dtype = datavault4dbt.string_default_dtype(type=stg) -%} +{%- set ns = namespace(last_cte = '', source_included_before = {}, source_models_rsrc_dict={}, has_rsrc_static_defined=true) -%} + +{%- if source_models is not mapping and not datavault4dbt.is_list(source_models) -%} + {%- set source_models = {source_models: {}} -%} +{%- endif -%} + +{%- set source_model_values = fromjson(datavault4dbt.source_model_processing(source_models=source_models, parameters={'hk_column':tracked_hashkey})) -%} +{%- set source_models = source_model_values['source_model_list'] -%} +{%- set ns.has_rsrc_static_defined = source_model_values['has_rsrc_static_defined'] -%} +{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%} +{{ log('source_models: '~source_models, false) }} + +{%- set final_columns_to_select = [tracked_hashkey] + [src_ldts] + [src_rsrc] + [src_stg] -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{% if is_incremental() %} + + distinct_concated_target AS ( + {%- set concat_columns = [tracked_hashkey, src_ldts, src_rsrc] -%} + {{ "\n" }} + SELECT + {{ datavault4dbt.concat_ws(concat_columns) }} as concat + FROM {{ this }} + ), + {%- if ns.has_rsrc_static_defined and not disable_hwm -%} + rsrc_static_unionized AS ( + {% for source_model in source_models %} + {# Create a query with a rsrc_static column with each rsrc_static for each source model. #} + {%- set source_number = source_model.id | string -%} + {%- set hk_column = source_model['hk_column'] -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number] -%} + + {%- set rsrc_static_query_source_count -%} + SELECT count(*) FROM ( + {%- for rsrc_static in rsrc_statics -%} + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + ) + {% endset %} + + {%- set rsrc_static_query_source -%} + {%- for rsrc_static in rsrc_statics -%} + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + {% endset %} + + {{ rsrc_static_query_source }} + + {%- set source_in_target = true -%} + + {%- if execute -%} + {%- set rsrc_static_result = run_query(rsrc_static_query_source_count) -%} + + {%- set row_count = rsrc_static_result.columns[0].values()[0] -%} + + {{ log('row_count for '~source_model~' is '~row_count, false) }} + + {%- if row_count == 0 -%} + {%- set source_in_target = false -%} + {%- endif -%} + {%- endif -%} + + {%- do ns.source_included_before.update({source_model.id: source_in_target}) -%} + {# Unionize over all sources #} + {%- if not loop.last %} + UNION ALL + {% endif -%} + + {% endfor -%} + {%- set ns.last_cte = "rsrc_static_unionized" -%} + ), + + max_ldts_per_rsrc_static_in_target AS ( + + SELECT + rsrc_static, + MAX({{ src_ldts }}) as max_ldts + FROM {{ ns.last_cte }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + GROUP BY rsrc_static + + ), + {%- endif %} +{% endif -%} + +{# + We deduplicate each source over hashkey + ldts + rsrc_static and if is_incremental only select the rows, where the ldts is later + than the latest one in the existing satellite for that rsrc_static. If a source is added to the existing satellite, all deduplicated + rows from that source are loaded into the satellite. +#} + +{%- for source_model in source_models %} + + {%- set source_number = source_model.id | string -%} + {%- set hk_column = source_model['hk_column'] -%} + {%- if ns.has_rsrc_static_defined -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number|string] -%} + + src_new_{{ source_number }} AS ( + {%- for rsrc_static in rsrc_statics %} + SELECT DISTINCT + {{ hk_column }} AS {{ tracked_hashkey }}, + {{ src_ldts }}, + CAST('{{ rsrc_static }}' AS {{ rsrc_default_dtype }} ) AS {{ src_rsrc }}, + CAST(UPPER('{{ source_model.name }}') AS {{ stg_default_dtype }}) AS {{ src_stg }} + FROM {{ ref(source_model.name) }} src + + + {%- if is_incremental() and ns.has_rsrc_static_defined and ns.source_included_before[source_number|int] and not disable_hwm %} + INNER JOIN max_ldts_per_rsrc_static_in_target max + ON max.rsrc_static = '{{ rsrc_static }}' + WHERE src.{{ src_ldts }} > max.max_ldts + {%- endif %} + {%- if not loop.last %} + UNION ALL + {% endif -%} + {% endfor %} + + ), + {%- else -%} + src_new_{{ source_number}} AS ( + SELECT DISTINCT + {{ hk_column }} AS {{ tracked_hashkey }}, + {{ src_ldts }}, + CAST({{ src_rsrc }} AS {{ rsrc_default_dtype }}) AS {{ src_rsrc }}, + CAST(UPPER('{{ source_model.name }}') AS {{ stg_default_dtype }}) AS {{ src_stg }} + FROM {{ ref(source_model.name) }} src + {%- if is_incremental() and source_models | length == 1 and not disable_hwm %} + WHERE src.{{ src_ldts }} > ( + SELECT MAX({{ src_ldts }}) + FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} + ), + {%- endif -%} + + {%- set ns.last_cte = "src_new_{}".format(source_number) %} + +{% endfor %} + +{# + If more than one source model is selected, all previously created deduplicated CTEs are unionized. +#} + +{%- if source_models | length > 1 %} + +source_new_union AS ( + {% for source_model in source_models %} + {%- set hk_column = source_model['hk_column'] -%} + {%- set source_number = source_model.id | string -%} + + SELECT + {{ tracked_hashkey }}, + {{ src_ldts }}, + {{ src_rsrc }}, + {{ src_stg }} + FROM src_new_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + + {% endfor %} + + {%- set ns.last_cte = 'source_new_union' -%} + +), + +{%- endif -%} + +{# + Selecting everything, either from the unionized data, or from the single CTE (if single source). Checking against the existing + satellite to only inserts that are not already inserted, if incremental run. +#} + +records_to_insert AS ( + + SELECT + {{ datavault4dbt.print_list(final_columns_to_select) }} + FROM {{ ns.last_cte }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + AND {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, beginning_of_all_times) }} + {%- if is_incremental() %} + AND {{ datavault4dbt.concat_ws(concat_columns) }} NOT IN (SELECT * FROM distinct_concated_target) + {% endif %} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/ref_hub.sql b/macros/tables/databricks/ref_hub.sql new file mode 100644 index 00000000..bfc3798d --- /dev/null +++ b/macros/tables/databricks/ref_hub.sql @@ -0,0 +1,215 @@ +{%- macro databricks__ref_hub(ref_keys, src_ldts, src_rsrc, source_models) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns = namespace(last_cte= "", source_included_before = {}, has_rsrc_static_defined=true, source_models_rsrc_dict={}) -%} + +{%- set ref_keys = datavault4dbt.expand_column_list(columns=[ref_keys]) -%} + +{# If no specific ref_keys is defined for each source, we apply the values set in the ref_keys variable. #} +{# If no rsrc_static parameter is defined in ANY of the source models then the whole code block of record_source performance lookup is not executed #} +{# For the use of record_source performance lookup it is required that every source model has the parameter rsrc_static defined and it cannot be an empty string #} +{%- if source_models is not mapping and not datavault4dbt.is_list(source_models) -%} + {%- set source_models = {source_models: {}} -%} +{%- endif -%} + +{%- set source_model_values = fromjson(datavault4dbt.source_model_processing(source_models=source_models, parameters={'test':'test'}, reference_keys=ref_keys)) -%} +{%- set source_models = source_model_values['source_model_list'] -%} +{%- set ns.has_rsrc_static_defined = source_model_values['has_rsrc_static_defined'] -%} +{%- set ns.source_models_rsrc_dict = source_model_values['source_models_rsrc_dict'] -%} +{{ log('source_models: '~source_models, false) }} + +{%- set final_columns_to_select = ref_keys + [src_ldts] + [src_rsrc] -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{% if is_incremental() -%} +{# Get all target ref_keys out of the existing ref_table for later incremental logic. #} + distinct_target_ref_keys AS ( + + SELECT + {{ datavault4dbt.concat_ws(ref_keys) }} + FROM {{ this }} + + ), + {%- if ns.has_rsrc_static_defined -%} + {% for source_model in source_models %} + {# Create a query with a rsrc_static column with each rsrc_static for each source model. #} + {%- set source_number = source_model.id | string -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict[source_number] -%} + + {{log('rsrc_statics: '~ rsrc_statics, false) }} + + {%- set rsrc_static_query_source -%} + SELECT count(*) FROM ( + {%- for rsrc_static in rsrc_statics -%} + SELECT t.{{ src_rsrc }}, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} like '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + ) + {% endset %} + + rsrc_static_{{ source_number }} AS ( + {%- for rsrc_static in rsrc_statics -%} + SELECT + t.{{ src_ldts }}, + '{{ rsrc_static }}' AS rsrc_static + FROM {{ this }} t + WHERE {{ src_rsrc }} LIKE '{{ rsrc_static }}' + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor -%} + {%- set ns.last_cte = "rsrc_static_{}".format(source_number) -%} + ), + + {%- set source_in_target = true -%} + + {%- if execute -%} + {%- set rsrc_static_result = run_query(rsrc_static_query_source) -%} + + {%- set row_count = rsrc_static_result.columns[0].values()[0] -%} + + {{ log('row_count for '~source_model~' is '~row_count, false) }} + + {%- if row_count == 0 -%} + {%- set source_in_target = false -%} + {%- endif -%} + {%- endif -%} + + + {%- do ns.source_included_before.update({source_model.id: source_in_target}) -%} + + {% endfor -%} + + {%- if source_models | length > 1 %} + + rsrc_static_union AS ( + {# Create one unionized table over all sources. It will be the same as the already existing + hub, but extended by the rsrc_static column. #} + {% for source_model in source_models %} + {%- set source_number = source_model.id | string -%} + + SELECT rsrc_static_{{ source_number }}.* FROM rsrc_static_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + {%- endfor %} + {%- set ns.last_cte = "rsrc_static_union" -%} + ), + + {%- endif %} + + max_ldts_per_rsrc_static_in_target AS ( + {# Use the previously created CTE to calculate the max load date timestamp per rsrc_static. #} + SELECT + rsrc_static, + MAX({{ src_ldts }}) as max_ldts + FROM {{ ns.last_cte }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + GROUP BY rsrc_static + + ), + {%- endif %} +{% endif -%} + +{% for source_model in source_models %} + + {%- set source_number = source_model.id | string -%} + + {%- if ns.has_rsrc_static_defined -%} + {%- set rsrc_statics = ns.source_models_rsrc_dict.id -%} + {%- endif -%} + + + src_new_{{ source_number }} AS ( + + SELECT + {% for ref_key in source_model['ref_keys'] -%} + {{ ref_key}}, + {% endfor -%} + + {{ src_ldts }}, + {{ src_rsrc }} + FROM {{ ref(source_model.name) }} src + + {%- if is_incremental() and ns.has_rsrc_static_defined and ns.source_included_before[source_number] %} + INNER JOIN max_ldts_per_rsrc_static_in_target max ON + ({%- for rsrc_static in rsrc_statics -%} + max.rsrc_static = '{{ rsrc_static }}' + {%- if not loop.last -%} OR + {% endif -%} + {%- endfor %}) + WHERE src.{{ src_ldts }} > max.max_ldts + {%- endif %} + + {%- set ns.last_cte = "src_new_{}".format(source_number) %} + + ), +{%- endfor -%} + +{%- if source_models | length > 1 %} + +source_new_union AS ( + + {%- for source_model in source_models -%} + + {%- set source_number = source_model.id | string -%} + + SELECT + {% for ref_key in source_model['ref_keys'] -%} + {{ ref_key }} AS {{ ref_keys[loop.index - 1] }}, + {% endfor -%} + + {{ src_ldts }}, + {{ src_rsrc }} + FROM src_new_{{ source_number }} + + {%- if not loop.last %} + UNION ALL + {% endif -%} + + {%- endfor -%} + + {%- set ns.last_cte = 'source_new_union' -%} + +), + +{%- endif %} + +earliest_ref_key_over_all_sources AS ( + + {#- Deduplicate the unionized records to only insert the earliest one. #} + SELECT + lcte.* + FROM {{ ns.last_cte }} AS lcte + + QUALIFY ROW_NUMBER() OVER (PARTITION BY {%- for ref_key in ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) = 1 + + {%- set ns.last_cte = 'earliest_ref_key_over_all_sources' -%} + +), + +records_to_insert AS ( + {#- Select everything from the previous CTE, if incremental filter for hashkeys that are not already in the hub. #} + SELECT + {{ datavault4dbt.print_list(final_columns_to_select) }} + FROM {{ ns.last_cte }} + + {%- if is_incremental() %} + WHERE {{ datavault4dbt.concat_ws(ref_keys) }} NOT IN (SELECT * FROM distinct_target_ref_keys) + {% endif -%} +) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/ref_sat_v0.sql b/macros/tables/databricks/ref_sat_v0.sql new file mode 100644 index 00000000..8670effc --- /dev/null +++ b/macros/tables/databricks/ref_sat_v0.sql @@ -0,0 +1,114 @@ +{%- macro databricks__ref_sat_v0(parent_ref_keys, src_hashdiff, src_payload, src_ldts, src_rsrc, source_model, disable_hwm, source_is_single_batch) -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set parent_ref_keys = datavault4dbt.expand_column_list(columns=[parent_ref_keys]) -%} + +{%- set ns=namespace(src_hashdiff="", hdiff_alias="") %} + +{%- if src_hashdiff is mapping and src_hashdiff is not none -%} + {% set ns.src_hashdiff = src_hashdiff["source_column"] %} + {% set ns.hdiff_alias = src_hashdiff["alias"] %} +{% else %} + {% set ns.src_hashdiff = src_hashdiff %} + {% set ns.hdiff_alias = src_hashdiff %} +{%- endif -%} + +{%- set source_cols = datavault4dbt.expand_column_list(columns=[src_rsrc, src_ldts, src_payload]) -%} + +{%- set source_relation = ref(source_model) -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# Selecting all source data, that is newer than latest data in ref_sat if incremental #} +source_data AS ( + + SELECT + {% for ref_key in parent_ref_keys %} + {{ref_key}}, + {% endfor %} + {{ ns.src_hashdiff }} as {{ ns.hdiff_alias }}, + {{ datavault4dbt.print_list(source_cols) }} + FROM {{ source_relation }} + + {%- if is_incremental() and not disable_hwm %} + WHERE {{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} +), + +{# Get the latest record for each parent ref key combination in existing sat, if incremental. #} +{%- if is_incremental() %} +latest_entries_in_sat AS ( + + SELECT + {% for ref_key in parent_ref_keys %} + {{ref_key}}, + {% endfor %} + {{ ns.hdiff_alias }} + FROM + {{ this }} + QUALIFY ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }} DESC) = 1 +), +{%- endif %} + +{# + Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each parent ref key combination. + Additionally adding a row number based on that order, if incremental. +#} +deduplicated_numbered_source AS ( + + SELECT + {% for ref_key in parent_ref_keys %} + {{ref_key}}, + {% endfor %} + {{ ns.hdiff_alias }}, + {{ datavault4dbt.print_list(source_cols) }} + {% if is_incremental() -%} + , ROW_NUMBER() OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) as rn + {%- endif %} + FROM source_data + QUALIFY + CASE + WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {%- for ref_key in parent_ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END +), + +{# + Select all records from the previous CTE. If incremental, compare the oldest incoming entry to + the existing records in the satellite. +#} +records_to_insert AS ( + + SELECT + {% for ref_key in parent_ref_keys %} + {{ref_key}}, + {% endfor %} + {{ ns.hdiff_alias }}, + {{ datavault4dbt.print_list(source_cols) }} + FROM deduplicated_numbered_source + {%- if is_incremental() %} + WHERE NOT EXISTS ( + SELECT 1 + FROM latest_entries_in_sat + WHERE 1=1 + {% for ref_key in parent_ref_keys %} + AND {{ datavault4dbt.multikey(ref_key, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} + {% endfor %} + AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} + AND deduplicated_numbered_source.rn = 1) + {%- endif %} + + ) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/ref_sat_v1.sql b/macros/tables/databricks/ref_sat_v1.sql new file mode 100644 index 00000000..061bbdc0 --- /dev/null +++ b/macros/tables/databricks/ref_sat_v1.sql @@ -0,0 +1,54 @@ +{%- macro databricks__ref_sat_v1(ref_sat_v0, ref_keys, hashdiff, src_ldts, src_rsrc, ledts_alias, add_is_current_flag) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set is_current_col_alias = var('datavault4dbt.is_current_col_alias', 'IS_CURRENT') -%} + +{%- set source_relation = ref(ref_sat_v0) -%} + +{%- set ref_keys = datavault4dbt.expand_column_list(columns=[ref_keys]) -%} + +{%- set all_columns = datavault4dbt.source_columns(source_relation=source_relation) -%} +{%- set exclude = ref_keys + [hashdiff, src_ldts, src_rsrc] -%} + +{%- set source_columns_to_select = datavault4dbt.process_columns_to_select(all_columns, exclude) -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# Calculate ledts based on the ldts of the earlier record. #} +end_dated_source AS ( + + SELECT + {% for ref_key in ref_keys %} + {{ref_key}}, + {% endfor %} + {{ hashdiff }}, + {{ src_rsrc }}, + {{ src_ldts }}, + COALESCE(LEAD(TRY_SUBTRACT({{ src_ldts }}, INTERVAL 1 MICROSECOND)) OVER (PARTITION BY {%- for ref_key in ref_keys %} {{ref_key}} {%- if not loop.last %}, {% endif %}{% endfor %} ORDER BY {{ src_ldts }}),{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) as {{ ledts_alias }}, + {{ datavault4dbt.print_list(source_columns_to_select) }} + FROM {{ source_relation }} + +) + +SELECT + {% for ref_key in ref_keys %} + {{ref_key}}, + {% endfor %} + {{ hashdiff }}, + {{ src_rsrc }}, + {{ src_ldts }}, + {{ ledts_alias }}, + {%- if add_is_current_flag %} + CASE WHEN {{ ledts_alias }} = {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + THEN TRUE + ELSE FALSE + END AS {{ is_current_col_alias }}, + {% endif -%} + {{ datavault4dbt.print_list(source_columns_to_select) }} +FROM end_dated_source + +{%- endmacro -%} diff --git a/macros/tables/databricks/ref_table.sql b/macros/tables/databricks/ref_table.sql new file mode 100644 index 00000000..82fcf782 --- /dev/null +++ b/macros/tables/databricks/ref_table.sql @@ -0,0 +1,145 @@ +{%- macro databricks__ref_table(ref_hub, ref_satellites, src_ldts, src_rsrc, historized, snapshot_trigger_column='is_active', snapshot_relation=none) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ref_hub_relation = ref(ref_hub|string) -%} + +{%- set is_current_col_alias = var('datavault4dbt.is_current_col_alias', 'IS_CURRENT') -%} +{%- set ledts_alias = var('datavault4dbt.ledts_alias', 'ledts') -%} +{%- set sdts_alias = var('datavault4dbt.sdts_alias', 'sdts') -%} + +{%- set include_business_objects_before_appearance = var('datavault4dbt.include_business_objects_before_appearance', 'false') -%} + +{{ log('ref_hub_relation: ' ~ ref_hub_relation, false) }} +{%- set hub_columns = datavault4dbt.source_columns(ref_hub_relation) -%} +{{ log('hub_columns: ' ~ hub_columns, false) }} +{%- set hub_columns_to_exclude = [src_ldts, src_rsrc] -%} +{%- set ref_key_cols = datavault4dbt.process_columns_to_select(columns_list=hub_columns, exclude_columns_list=hub_columns_to_exclude )%} +{{ log('ref_key_cols: ' ~ ref_key_cols, false) }} +{%- set sat_columns_to_exclude = [src_ldts, src_rsrc, ledts_alias, is_current_col_alias] + ref_key_cols -%} +{{ log('sat_columns_to_exclude: '~ sat_columns_to_exclude, false) }} + +{%- set ref_satellites_dict = {} -%} + +{%- if not datavault4dbt.is_list(ref_satellites) and not ref_satellites is mapping -%} + {%- set ref_satellites = [ref_satellites] -%} +{%- endif -%} + +{%- if datavault4dbt.is_list(ref_satellites) -%} + {%- for ref_satellite in ref_satellites -%} + {%- do ref_satellites_dict.update({ref_satellite:{}}) -%} + {%- endfor -%} +{%- else -%} + {%- set ref_satellites_dict = ref_satellites -%} +{%- endif -%} + + +WITH + +dates AS ( + +{% if historized in ['full', 'latest'] -%} + + {%- set date_column = src_ldts -%} + + + {{ log('ref_satellites: '~ ref_satellites, false) -}} + + {% if historized == 'full' -%} + SELECT distinct {{ date_column }} FROM ( + {%- elif historized == 'latest' -%} + SELECT MAX({{ date_column }}) as {{ date_column }} FROM ( + {%- endif -%} + + {% for satellite in ref_satellites_dict.keys() -%} + SELECT distinct + {{ src_ldts }} + FROM {{ ref(satellite|string) }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + {% if not loop.last -%} UNION {% endif %} + {%- endfor %} + ) + + +{% elif snapshot_relation is not none %} + + {%- set date_column = sdts_alias -%} + + SELECT + {{ date_column }} + FROM ( + + SELECT + {{ sdts_alias }} + FROM {{ ref(snapshot_relation) }} + WHERE {{ snapshot_trigger_column }} + ) + +{%- endif %} + +{%- if is_incremental() -%} + WHERE {{ date_column }} > (SELECT MAX({{ date_column }}) FROM {{ this }}) +{%- endif -%} + + +), + +ref_table AS ( + + SELECT + {{ datavault4dbt.print_list(list_to_print=ref_key_cols, indent=2, src_alias='h') }}, + ld.{{ date_column }}, + h.{{ src_rsrc }}, + + {%- for satellite in ref_satellites_dict.keys() %} + + {%- set sat_alias = 's_' + loop.index|string -%} + {%- set sat_columns_pre = [] -%} + + {%- if ref_satellites_dict[satellite] is mapping and 'include' in ref_satellites_dict[satellite].keys() -%} + {%- set sat_columns_pre = ref_satellites_dict[satellite]['include'] -%} + {%- elif ref_satellites_dict[satellite] is mapping and 'exclude' in ref_satellites_dict[satellite].keys() -%} + {%- set all_sat_columns = datavault4dbt.source_columns(ref(satellite)) -%} + {%- set sat_columns_pre = datavault4dbt.process_columns_to_select(all_sat_columns, ref_satellites_dict[satellite]['exclude']) -%} + {%- elif datavault4dbt.is_list(ref_satellites_dict[satellite]) -%} + {%- set sat_columns_pre = ref_satellites_dict[satellite] -%} + {%- else -%} + {%- set all_sat_columns = datavault4dbt.source_columns(ref(satellite)) -%} + {%- set sat_columns_pre = datavault4dbt.process_columns_to_select(all_sat_columns, sat_columns_to_exclude) -%} + {%- endif -%} + + {%- set sat_columns = datavault4dbt.process_columns_to_select(sat_columns_pre, sat_columns_to_exclude) -%} + + {{- log('sat_columns: '~ sat_columns, false) -}} + + {{ datavault4dbt.print_list(list_to_print=sat_columns, indent=2, src_alias=sat_alias) }} + {%- if not loop.last -%} , + {% endif -%} + + {% endfor %} + + FROM {{ ref(ref_hub) }} h + + FULL OUTER JOIN dates ld + ON 1 = 1 + + {% for satellite in ref_satellites_dict.keys() %} + + {%- set sat_alias = 's_' + loop.index|string -%} + + LEFT JOIN {{ ref(satellite) }} {{ sat_alias }} + ON {{ datavault4dbt.multikey(columns=ref_key_cols, prefix=['h', sat_alias], condition='=') }} + AND ld.{{ date_column }} BETWEEN {{ sat_alias }}.{{ src_ldts }} AND {{ sat_alias }}.{{ ledts_alias }} + + {% endfor %} + + {% if include_business_objects_before_appearance == 'false' -%} + WHERE h.{{ src_ldts }} <= ld.{{ date_column }} + {% endif %} + +) + +SELECT * FROM ref_table + +{%- endmacro -%} \ No newline at end of file diff --git a/macros/tables/databricks/sat_v0.sql b/macros/tables/databricks/sat_v0.sql new file mode 100644 index 00000000..98a3ea0e --- /dev/null +++ b/macros/tables/databricks/sat_v0.sql @@ -0,0 +1,101 @@ +{%- macro databricks__sat_v0(parent_hashkey, src_hashdiff, src_payload, src_ldts, src_rsrc, source_model, disable_hwm, source_is_single_batch) -%} + +{%- set beginning_of_all_times = datavault4dbt.beginning_of_all_times() -%} +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set ns=namespace(src_hashdiff="", hdiff_alias="") %} + +{%- if src_hashdiff is mapping and src_hashdiff is not none -%} + {% set ns.src_hashdiff = src_hashdiff["source_column"] %} + {% set ns.hdiff_alias = src_hashdiff["alias"] %} +{% else %} + {% set ns.src_hashdiff = src_hashdiff %} + {% set ns.hdiff_alias = src_hashdiff %} +{%- endif -%} + +{%- set source_cols = datavault4dbt.expand_column_list(columns=[src_rsrc, src_ldts, src_payload]) -%} + +{%- set source_relation = ref(source_model) -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# Selecting all source data, that is newer than latest data in sat if incremental #} +source_data AS ( + + SELECT + {{ parent_hashkey }}, + {{ ns.src_hashdiff }} as {{ ns.hdiff_alias }}, + {{ datavault4dbt.print_list(source_cols) }} + FROM {{ source_relation }} + + {%- if is_incremental() %} + WHERE {{ src_ldts }} > ( + SELECT + MAX({{ src_ldts }}) FROM {{ this }} + WHERE {{ src_ldts }} != {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + ) + {%- endif %} +), + +{# Get the latest record for each parent hashkey in existing sat, if incremental. #} +{%- if is_incremental() %} +latest_entries_in_sat AS ( + + SELECT + {{ parent_hashkey }}, + {{ ns.hdiff_alias }} + FROM + {{ this }} + QUALIFY ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }} DESC) = 1 +), +{%- endif %} + +{# + Deduplicate source by comparing each hashdiff to the hashdiff of the previous record, for each hashkey. + Additionally adding a row number based on that order, if incremental. +#} +deduplicated_numbered_source AS ( + + SELECT + {{ parent_hashkey }}, + {{ ns.hdiff_alias }}, + {{ datavault4dbt.print_list(source_cols) }} + {% if is_incremental() -%} + , ROW_NUMBER() OVER(PARTITION BY {{ parent_hashkey }} ORDER BY {{ src_ldts }}) as rn + {%- endif %} + FROM source_data + QUALIFY + CASE + WHEN {{ ns.hdiff_alias }} = LAG({{ ns.hdiff_alias }}) OVER(PARTITION BY {{ parent_hashkey|lower }} ORDER BY {{ src_ldts }}) THEN FALSE + ELSE TRUE + END +), + +{# + Select all records from the previous CTE. If incremental, compare the oldest incoming entry to + the existing records in the satellite. +#} +records_to_insert AS ( + + SELECT + {{ parent_hashkey }}, + {{ ns.hdiff_alias }}, + {{ datavault4dbt.print_list(source_cols) }} + FROM deduplicated_numbered_source + {%- if is_incremental() %} + WHERE NOT EXISTS ( + SELECT 1 + FROM latest_entries_in_sat + WHERE {{ datavault4dbt.multikey(parent_hashkey, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} + AND {{ datavault4dbt.multikey(ns.hdiff_alias, prefix=['latest_entries_in_sat', 'deduplicated_numbered_source'], condition='=') }} + AND deduplicated_numbered_source.rn = 1) + {%- endif %} + + ) + +SELECT * FROM records_to_insert + +{%- endmacro -%} diff --git a/macros/tables/databricks/sat_v1.sql b/macros/tables/databricks/sat_v1.sql new file mode 100644 index 00000000..10f6077c --- /dev/null +++ b/macros/tables/databricks/sat_v1.sql @@ -0,0 +1,52 @@ +{%- macro databricks__sat_v1(sat_v0, hashkey, hashdiff, src_ldts, src_rsrc, ledts_alias, add_is_current_flag, include_payload) -%} + +{%- set end_of_all_times = datavault4dbt.end_of_all_times() -%} +{%- set timestamp_format = datavault4dbt.timestamp_format() -%} + +{%- set is_current_col_alias = var('datavault4dbt.is_current_col_alias', 'IS_CURRENT') -%} + +{%- set source_relation = ref(sat_v0) -%} + +{%- set all_columns = datavault4dbt.source_columns(source_relation=source_relation) -%} +{%- set exclude = [hashkey, hashdiff, src_ldts, src_rsrc] -%} + +{%- set source_columns_to_select = datavault4dbt.process_columns_to_select(all_columns, exclude) -%} + +{{ datavault4dbt.prepend_generated_by() }} + +WITH + +{# Calculate ledts based on the ldts of the earlier record. #} +end_dated_source AS ( + + SELECT + {{ hashkey }}, + {{ hashdiff }}, + {{ src_rsrc }}, + {{ src_ldts }}, + COALESCE(LEAD(TRY_SUBTRACT({{ src_ldts }}, INTERVAL 1 MICROSECOND)) OVER (PARTITION BY {{ hashkey }} ORDER BY {{ src_ldts }}),{{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }}) as {{ ledts_alias }} + {%- if include_payload -%}, + {{ datavault4dbt.print_list(source_columns_to_select) }} + {%- endif %} + FROM {{ source_relation }} + +) + +SELECT + {{ hashkey }}, + {{ hashdiff }}, + {{ src_rsrc }}, + {{ src_ldts }}, + {{ ledts_alias }} + {%- if add_is_current_flag %}, + CASE WHEN {{ ledts_alias }} = {{ datavault4dbt.string_to_timestamp(timestamp_format, end_of_all_times) }} + THEN TRUE + ELSE FALSE + END AS {{ is_current_col_alias }} + {% endif -%} + {%- if include_payload -%}, + {{ datavault4dbt.print_list(source_columns_to_select) }} + {%- endif %} +FROM end_dated_source + +{%- endmacro -%}