Skip to content

Commit

Permalink
Refactor daily & streaming support (Velir#188)
Browse files Browse the repository at this point in the history
* update docs. update multisite macro

* remove reference to intraday model that was removed

* update dbt_utils version

* cleaning up comments

* Update README.md
  • Loading branch information
adamribaudo-velir authored Jun 23, 2023
1 parent af6b381 commit fd26c3c
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 122 deletions.
37 changes: 3 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ packages:
```
## Required Variables

This package assumes that you have an existing DBT project with a BigQuery profile and a BigQuery GCP instance available with GA4 event data loaded. Source data is defined using the following variables which must be set in `dbt_project.yml`.
This package assumes that you have an existing DBT project with a BigQuery profile and a BigQuery GCP instance available with GA4 event data loaded. Source data is defined using the `project` and `dataset` variables below. The `static_incremental_days` variable defines how many days' worth of data to reprocess during incremental runs.

```
vars:
ga4:
project: "your_gcp_project"
dataset: "your_ga4_dataset"
start_date: "YYYYMMDD" # Earliest date to load
frequency: "daily" # daily|streaming|daily+streaming. See 'Export Frequency' below.
static_incremental_days: 3 # Number of days to scan and reprocess on each run
```
See [Multi-Property Support](#multi-property-support) section for details on configuring multiple GA4 properties as a source.

## Optional Variables

Expand Down Expand Up @@ -254,36 +255,6 @@ vars:
- name: "some_other_parameter"
value_type: "string_value"
```

# Incremental Loading of Event Data (and how to handle late-arriving hits)

By default, GA4 exports data into sharded event tables that use the event date as the table suffix in the format of `events_YYYYMMDD` or `events_intraday_YYYYMMDD`. This package incrementally loads data from these tables into `base_ga4__events` which is partitioned on date. There are two incremental loading strategies available:

- Dynamic incremental partitions (Default) - This strategy queries the destination table to find the latest date available. Data beyond that date range is loaded in incrementally on each run.
- Static incremental partitions - This strategy is enabled when the `static_incremental_days` variable is set to an integer. It incrementally loads in the last X days worth of data regardless of what data is availabe. Google will update the daily event tables within the last 72 hours to handle late-arriving hits so you should use this strategy if late-arriving hits is a concern. The 'dynamic incremental' strategy will not re-process past date tables. Ex: A `static_incremental_days` setting of `3` would load data from `current_date - 1` `current_date - 2` and `current_date - 3`. Note that `current_date` uses UTC as the timezone.

# Export Frequency

The value of the `frequency` variable should match the "Frequency" setting on GA4's BigQuery Linking Admin page.

| GA4 | dbt_project.yml |
|-----|-----------------|
| Daily | "daily" |
| Streaming | "streaming" |
| both Daily and Streaming | "daily+streaming" |

The daily option (default) is for sites that use just the daily, batch export. It can also be used as a substitute for the "daily+streaming" option where you don't care about including today's data so it doesn't strictly need to match the GA4 "Frequency" setting.
The streaming option is for sites that only use the streaming export. The streaming export is not constrained by Google's one million event daily limit and so is the best option for sites that may exceed that limit. Selecting both "Daily" and "Streaming" in GA4 causes the streaming, intraday BigQuery tables to be deleted when the daily, batch tables are updated.
The "daily+streaming" option uses the daily batch export and unions the streaming intraday tables. It is intended to append today's data from the streaming intraday to the batch tables.

Example:

```
vars:
ga4:
frequency: "daily+streaming"
```

# Connecting to BigQuery

This package assumes that BigQuery is the source of your GA4 data. Full instructions for connecting DBT to BigQuery are here: https://docs.getdbt.com/reference/warehouse-profiles/bigquery-profile
Expand Down Expand Up @@ -324,8 +295,6 @@ vars:

With these variables set, the `combine_property_data` macro will run as a pre-hook to `base_ga4_events` and clone shards to the target dataset. The number of days' worth of data to clone during incremental runs will be based on the `static_incremental_days` variable.

When the frequency variable is set to `daily` or `daily+streaming`, the `events_*` tables will be copied and intraday tables will be ignored. When the frequency is set to `streaming`, only the `events_intraday_*` tables will be copied.

Jobs that run a large number of clone operations are prone to timing out. As a result, it is recommended that you increase the query timeout if you need to backfill or full-refresh the table, when first setting up or when the base model gets modified. Otherwise, it is best to prevent the base model from rebuilding on full refreshes unless needed to minimize timeouts.

```
Expand Down
9 changes: 4 additions & 5 deletions macros/combine_property_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,32 @@

create schema if not exists `{{var('project')}}.{{var('dataset')}}`;

-- If incremental, then use static_incremental_days variable to find earliest shard to copy
{# If incremental, then use static_incremental_days variable to find earliest shard to copy #}
{% if not should_full_refresh() %}
{% set earliest_shard_to_retrieve = (modules.datetime.date.today() - modules.datetime.timedelta(days=var('static_incremental_days')))|string|replace("-", "")|int %}
{% else %}
-- Otherwise use 'start_date' variable
{# Otherwise use 'start_date' variable #}

{% set earliest_shard_to_retrieve = var('start_date')|int %}
{% endif %}

{% for property_id in var('property_ids') %}
{%- set schema_name = "analytics_" + property_id|string -%}
{%- if var('frequency', daily) == 'streaming' -%}
{# Copy daily tables #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{var('project')}}.{{var('dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`;
{%- endif -%}
{% endfor %}
{%- else -%}
{# Copy intraday tables #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{var('project')}}.{{var('dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`;
{%- endif -%}
{% endfor %}
{%- endif -%}
{% endfor %}
{% endmacro %}
80 changes: 34 additions & 46 deletions models/staging/base/base_ga4__events.sql
Original file line number Diff line number Diff line change
@@ -1,61 +1,49 @@
{% if var('static_incremental_days', false ) %}
{% set partitions_to_replace = ['current_date'] %}
{% for i in range(var('static_incremental_days')) %}
{% set partitions_to_replace = partitions_to_replace.append('date_sub(current_date, interval ' + (i+1)|string + ' day)') %}
{% endfor %}
{{
config(
pre_hook="{{ ga4.combine_property_data() }}" if var('property_ids', false) else "",
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by={
"field": "event_date_dt",
"data_type": "date",
},
partitions = partitions_to_replace,
cluster_by=['event_name']
)
}}
{% else %}
{{
config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by={
"field": "event_date_dt",
"data_type": "date",
},
cluster_by=['event_name']
)
}}
{% endif %}
--BigQuery does not cache wildcard queries that scan across sharded tables which means it's best to materialize the raw event data as a partitioned table so that future queries benefit from caching
with source as (
{% set partitions_to_replace = ['current_date'] %}
{% for i in range(var('static_incremental_days')) %}
{% set partitions_to_replace = partitions_to_replace.append('date_sub(current_date, interval ' + (i+1)|string + ' day)') %}
{% endfor %}
{{
config(
pre_hook="{{ ga4.combine_property_data() }}" if var('property_ids', false) else "",
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by={
"field": "event_date_dt",
"data_type": "date",
},
partitions = partitions_to_replace,
cluster_by=['event_name']
)
}}

with source_daily as (
select
{{ ga4.base_select_source() }}
{% if var('frequency', 'daily') == 'streaming' %}
from {{ source('ga4', 'events_intraday') }}
where cast( _table_suffix as int64) >= {{var('start_date')}}
{% else %}
from {{ source('ga4', 'events') }}
where _table_suffix not like '%intraday%'
and cast( _table_suffix as int64) >= {{var('start_date')}}
{% if is_incremental() %}
and parse_date('%Y%m%d', left(_TABLE_SUFFIX, 8)) in ({{ partitions_to_replace | join(',') }})
{% endif %}
),
source_intraday as (
select
{{ ga4.base_select_source() }}
from {{ source('ga4', 'events_intraday') }}
where cast( _table_suffix as int64) >= {{var('start_date')}}
{% if is_incremental() %}

{% if var('static_incremental_days', false ) %}
and parse_date('%Y%m%d', left(_TABLE_SUFFIX, 8)) in ({{ partitions_to_replace | join(',') }})
{% else %}
-- Incrementally add new events. Filters on _TABLE_SUFFIX using the max event_date_dt value found in {{this}}
-- See https://docs.getdbt.com/reference/resource-configs/bigquery-configs#the-insert_overwrite-strategy
and parse_date('%Y%m%d',left(_TABLE_SUFFIX, 8)) >= _dbt_max_partition
{% endif %}
and parse_date('%Y%m%d', left(_TABLE_SUFFIX, 8)) in ({{ partitions_to_replace | join(',') }})
{% endif %}
),
unioned as (
select * from source_daily
union all
select * from source_intraday
),
renamed as (
select
{{ ga4.base_select_renamed() }}
from source
from unioned
)

select * from renamed
Expand Down
24 changes: 0 additions & 24 deletions models/staging/base/base_ga4__events_intraday.sql

This file was deleted.

6 changes: 0 additions & 6 deletions models/staging/base/base_ga4__events_intraday.yml

This file was deleted.

2 changes: 1 addition & 1 deletion models/staging/src_ga4.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ sources:
description: Main events table exported by GA4. Sharded by date.
- name: events_intraday
identifier: events_intraday_*
description: Intraday events table which is optionally exported by GA4. Always contains events from the current day.
description: Intraday events table which is optionally exported by GA4.
5 changes: 0 additions & 5 deletions models/staging/stg_ga4__events.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
-- This staging model contains key creation and window functions. Keeping window functions outside of the base incremental model ensures that the incremental updates don't artificially limit the window partition sizes (ex: if a session spans 2 days, but only 1 day is in the incremental update)

with base_events as (
select * from {{ ref('base_ga4__events')}}
{% if var('frequency', 'daily') == 'daily+streaming' %}
union all
select * from {{ref('base_ga4__events_intraday')}}
{% endif %}
),
-- Add key that captures a combination of stream_id and user_pseudo_id to uniquely identify a 'client' (aka. a device) within a single stream
include_client_key as (
Expand Down
2 changes: 1 addition & 1 deletion packages.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
version: 1.1.1

0 comments on commit fd26c3c

Please sign in to comment.