Skip to content

Commit

Permalink
fix generic tests
Browse files Browse the repository at this point in the history
  • Loading branch information
amishas157 committed Sep 13, 2024
1 parent 0425e26 commit 4c9c4db
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
1 change: 1 addition & 0 deletions models/staging/stg_history_ledgers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ with
, batch_id
, batch_run_date
, batch_insert_ts
, total_byte_size_of_bucket_list
, '{{ var("airflow_start_timestamp") }}' as airflow_start_ts
from raw_table
)
Expand Down
4 changes: 2 additions & 2 deletions tests/eho_by_ops.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
-- any id present in the upstream table should be loaded in
-- the downstream. If records are not present, alert the team.
WITH find_missing AS (
SELECT op.id,
SELECT op.op_id,
op.batch_run_date,
op.batch_id
FROM {{ ref('stg_history_operations') }} op
LEFT OUTER JOIN {{ ref('enriched_history_operations') }} eho
ON op.id = eho.op_id
ON op.op_id = eho.op_id
WHERE eho.op_id IS NULL
-- Scan only the last 24 hours of data. Alert runs intraday so failures
-- are caught and resolved quickly.
Expand Down
6 changes: 3 additions & 3 deletions tests/ledger_sequence_increment.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
with
ledger_sequence as (
select
id
ledger_id
, batch_id
, closed_at
, max(sequence) as max_sequence
from {{ ref('stg_history_ledgers') }}
where closed_at > TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 7 DAY )
group by id, batch_id, closed_at
group by ledger_id, batch_id, closed_at
)

, lead_sequence as (
select
id
ledger_id
, batch_id
, closed_at
, max_sequence
Expand Down
8 changes: 4 additions & 4 deletions tests/num_txns_and_ops.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-- and transactions or operations were dropped from the dataset.
-- Get the actual count of transactions per ledger
WITH txn_count AS (
SELECT ledger_sequence, COUNT(id) as txn_transaction_count
SELECT ledger_sequence, COUNT(transaction_id) as txn_transaction_count
FROM {{ ref('stg_history_transactions') }}
--Take all ledgers committed in the last 36 hours to validate newly written data
-- Alert runs at 12pm UTC in GCP which creates the 36 hour interval
Expand All @@ -20,10 +20,10 @@ WITH txn_count AS (
),
-- Get the actual count of operations per ledger
operation_count AS (
SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count
SELECT A.ledger_sequence, COUNT(B.op_id) AS op_operation_count
FROM {{ ref('stg_history_transactions') }} A
JOIN {{ ref('stg_history_operations') }} B
ON A.id = B.transaction_id
ON A.transaction_id = B.transaction_id
WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY )
GROUP BY A.ledger_sequence
Expand All @@ -32,7 +32,7 @@ WITH txn_count AS (
final_counts AS (
SELECT A.sequence, A.closed_at, A.batch_id,
A.tx_set_operation_count as expected_operation_count,
A.operation_count,
A.ledger_operation_count,
(A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count,
COALESCE(B.txn_transaction_count, 0) as actual_transaction_count,
COALESCE(C.op_operation_count, 0) as actual_operation_count
Expand Down

0 comments on commit 4c9c4db

Please sign in to comment.