From 4c9c4dbf6d17c3e780bfbe34e54ab43dd06a7a05 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Fri, 13 Sep 2024 14:46:26 +0530 Subject: [PATCH] fix generic tests --- models/staging/stg_history_ledgers.sql | 1 + tests/eho_by_ops.sql | 4 ++-- tests/ledger_sequence_increment.sql | 6 +++--- tests/num_txns_and_ops.sql | 8 ++++---- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/models/staging/stg_history_ledgers.sql b/models/staging/stg_history_ledgers.sql index 6b6315e..362c64c 100644 --- a/models/staging/stg_history_ledgers.sql +++ b/models/staging/stg_history_ledgers.sql @@ -34,6 +34,7 @@ with , batch_id , batch_run_date , batch_insert_ts + , total_byte_size_of_bucket_list , '{{ var("airflow_start_timestamp") }}' as airflow_start_ts from raw_table ) diff --git a/tests/eho_by_ops.sql b/tests/eho_by_ops.sql index 6691f4b..250b527 100644 --- a/tests/eho_by_ops.sql +++ b/tests/eho_by_ops.sql @@ -9,12 +9,12 @@ -- any id present in the upstream table should be loaded in -- the downstream. If records are not present, alert the team. WITH find_missing AS ( - SELECT op.id, + SELECT op.op_id, op.batch_run_date, op.batch_id FROM {{ ref('stg_history_operations') }} op LEFT OUTER JOIN {{ ref('enriched_history_operations') }} eho - ON op.id = eho.op_id + ON op.op_id = eho.op_id WHERE eho.op_id IS NULL -- Scan only the last 24 hours of data. Alert runs intraday so failures -- are caught and resolved quickly. diff --git a/tests/ledger_sequence_increment.sql b/tests/ledger_sequence_increment.sql index 59c72ef..8ca36e1 100644 --- a/tests/ledger_sequence_increment.sql +++ b/tests/ledger_sequence_increment.sql @@ -7,18 +7,18 @@ with ledger_sequence as ( select - id + ledger_id , batch_id , closed_at , max(sequence) as max_sequence from {{ ref('stg_history_ledgers') }} where closed_at > TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 7 DAY ) - group by id, batch_id, closed_at + group by ledger_id, batch_id, closed_at ) , lead_sequence as ( select - id + ledger_id , batch_id , closed_at , max_sequence diff --git a/tests/num_txns_and_ops.sql b/tests/num_txns_and_ops.sql index 06d161f..1f8c823 100644 --- a/tests/num_txns_and_ops.sql +++ b/tests/num_txns_and_ops.sql @@ -11,7 +11,7 @@ -- and transactions or operations were dropped from the dataset. -- Get the actual count of transactions per ledger WITH txn_count AS ( - SELECT ledger_sequence, COUNT(id) as txn_transaction_count + SELECT ledger_sequence, COUNT(transaction_id) as txn_transaction_count FROM {{ ref('stg_history_transactions') }} --Take all ledgers committed in the last 36 hours to validate newly written data -- Alert runs at 12pm UTC in GCP which creates the 36 hour interval @@ -20,10 +20,10 @@ WITH txn_count AS ( ), -- Get the actual count of operations per ledger operation_count AS ( - SELECT A.ledger_sequence, COUNT(B.id) AS op_operation_count + SELECT A.ledger_sequence, COUNT(B.op_id) AS op_operation_count FROM {{ ref('stg_history_transactions') }} A JOIN {{ ref('stg_history_operations') }} B - ON A.id = B.transaction_id + ON A.transaction_id = B.transaction_id WHERE TIMESTAMP(A.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) AND TIMESTAMP(B.batch_run_date) >= TIMESTAMP_SUB('{{ dbt_airflow_macros.ts(timezone=none) }}', INTERVAL 1 DAY ) GROUP BY A.ledger_sequence @@ -32,7 +32,7 @@ WITH txn_count AS ( final_counts AS ( SELECT A.sequence, A.closed_at, A.batch_id, A.tx_set_operation_count as expected_operation_count, - A.operation_count, + A.ledger_operation_count, (A.failed_transaction_count + A.successful_transaction_count) as expected_transaction_count, COALESCE(B.txn_transaction_count, 0) as actual_transaction_count, COALESCE(C.op_operation_count, 0) as actual_operation_count