Skip to content

Commit

Permalink
check for intraday table before processing union operation (Velir#250)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamribaudo-velir authored Jul 20, 2023
1 parent 7cfbd3e commit 78973a2
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions models/staging/base/base_ga4__events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
{% for i in range(var('static_incremental_days')) %}
{% set partitions_to_replace = partitions_to_replace.append('date_sub(current_date, interval ' + (i+1)|string + ' day)') %}
{% endfor %}
{% if var('property_ids', false) == false %}
{% set relations_intraday = dbt_utils.get_relations_by_pattern(schema_pattern=var('dataset'), table_pattern='events_intraday_%', database=var('project')) %}
{% endif %}
{{
config(
pre_hook="{{ ga4.combine_property_data() }}" if var('property_ids', false) else "",
Expand All @@ -26,25 +29,34 @@ with source_daily as (
and parse_date('%Y%m%d', left(_TABLE_SUFFIX, 8)) in ({{ partitions_to_replace | join(',') }})
{% endif %}
),
source_intraday as (
select
{{ ga4.base_select_source() }}
from {{ source('ga4', 'events_intraday') }}
where cast( _table_suffix as int64) >= {{var('start_date')}}
{% if is_incremental() %}
and parse_date('%Y%m%d', left(_TABLE_SUFFIX, 8)) in ({{ partitions_to_replace | join(',') }})
{% endif %}
),
unioned as (
select * from source_daily
union all
select * from source_intraday
),
renamed as (
select
{{ ga4.base_select_renamed() }}
from unioned
)
-- Include intraday data if using a single-property configuration and the events_intraday_* table exists
{% if var('property_ids', false) == false and relations_intraday|length > 0 %}
source_intraday as (
select
{{ ga4.base_select_source() }}
from {{ source('ga4', 'events_intraday') }}
where cast( _table_suffix as int64) >= {{var('start_date')}}
{% if is_incremental() %}
and parse_date('%Y%m%d', left(_TABLE_SUFFIX, 8)) in ({{ partitions_to_replace | join(',') }})
{% endif %}
),
unioned as (
select * from source_daily
union all
select * from source_intraday
),
renamed as (
select
{{ ga4.base_select_renamed() }}
from unioned
)
{% else %}
renamed as (
select
{{ ga4.base_select_renamed() }}
from source_daily
)
{% endif%}

select * from renamed
qualify row_number() over(partition by event_date_dt, stream_id, user_pseudo_id, session_id, event_name, event_timestamp, to_json_string(event_params)) = 1

0 comments on commit 78973a2

Please sign in to comment.