From ec3b620dfc2437c7fb881f80ffe946079390e92d Mon Sep 17 00:00:00 2001 From: Matt Dowell Date: Mon, 19 Jan 2026 16:24:08 -0800 Subject: [PATCH] Store oldest flowtime --- amiadapters/storage/snowflake.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/amiadapters/storage/snowflake.py b/amiadapters/storage/snowflake.py index d700fbf..ba5423d 100644 --- a/amiadapters/storage/snowflake.py +++ b/amiadapters/storage/snowflake.py @@ -527,7 +527,9 @@ def _meter_tuple(self, meter: GeneralMeter, row_active_from: datetime): return tuple(result) def _upsert_reads(self, reads: List[GeneralMeterRead], conn, table_name="readings"): - self._verify_no_duplicate_reads(reads) + oldest_flowtime = self._verify_no_duplicate_reads_and_return_oldest_flowtime( + reads + ) temp_table_name = f"temp_{table_name}" create_temp_table_sql = ( @@ -582,6 +584,12 @@ def _upsert_reads(self, reads: List[GeneralMeterRead], conn, table_name="reading source.battery, source.install_date, source.connection, source.estimated) """ conn.cursor().execute(merge_sql) + if oldest_flowtime is not None: + self.metrics.gauge( + "snowflake_storage_sink.oldest_flowtime_stored", + int(oldest_flowtime.timestamp()), + tags={"org_id": self.org_id}, + ) def _meter_read_tuple(self, read: GeneralMeterRead): result = [ @@ -611,8 +619,11 @@ def _verify_no_duplicate_meters(self, meters: List[GeneralMeter]): ) seen.add(key) - def _verify_no_duplicate_reads(self, reads: List[GeneralMeterRead]): + def _verify_no_duplicate_reads_and_return_oldest_flowtime( + self, reads: List[GeneralMeterRead] + ): seen = set() + oldest_flowtime = None for read in reads: key = (read.org_id, read.device_id, read.flowtime) if key in seen: @@ -620,6 +631,9 @@ def _verify_no_duplicate_reads(self, reads: List[GeneralMeterRead]): f"Encountered duplicate read in data for Snowflake: {key}" ) seen.add(key) + if oldest_flowtime is None or read.flowtime < oldest_flowtime: + oldest_flowtime = read.flowtime + return oldest_flowtime def calculate_end_of_backfill_range( self, org_id: str, min_date: datetime, max_date: datetime