diff --git a/amiadapters/adapters/base.py b/amiadapters/adapters/base.py index d5359ff..84d20cd 100644 --- a/amiadapters/adapters/base.py +++ b/amiadapters/adapters/base.py @@ -68,6 +68,9 @@ def __init__( configured_task_output_controller, org_id ) self.metrics = Metrics.from_configuration(configured_metrics) + self._base_adapter_metrics = self._BaseAdapterMetrics( + self.metrics, self.org_id, type(self).__name__ + ) self.storage_sinks = self._create_storage_sinks( configured_sinks, self.org_id, @@ -99,13 +102,16 @@ def extract_and_output( """ Public function for extract stage. """ - logger.info( - f"Extracting data for range {extract_range_start} to {extract_range_end}" - ) - # Use adapter implementation to extract data - extracted_output = self._extract(run_id, extract_range_start, extract_range_end) - # Output to intermediate storage, e.g. S3 or local files - self.output_controller.write_extract_outputs(run_id, extracted_output) + with self._base_adapter_metrics.extract_timer(): + logger.info( + f"Extracting data for range {extract_range_start} to {extract_range_end}" + ) + # Use adapter implementation to extract data + extracted_output = self._extract( + run_id, extract_range_start, extract_range_end + ) + # Output to intermediate storage, e.g. S3 or local files + self.output_controller.write_extract_outputs(run_id, extracted_output) @abstractmethod def _extract( @@ -128,24 +134,25 @@ def transform_and_output(self, run_id: str): """ Public function for transform stage. """ - extract_outputs = self.output_controller.read_extract_outputs(run_id) - transformed_meters, transformed_reads = self._transform(run_id, extract_outputs) - # TODO clean this up - self.metrics.incr( - "transform.meters_transformed", - len(transformed_meters), - tags={"org_id": self.org_id}, - ) - self.metrics.incr( - "transform.meter_reads_transformed", - len(transformed_reads), - tags={"org_id": self.org_id}, - ) - logger.info( - f"Transformed {len(transformed_meters)} meters for org {self.org_id}" - ) - self.output_controller.write_transformed_meters(run_id, transformed_meters) - self.output_controller.write_transformed_meter_reads(run_id, transformed_reads) + with self._base_adapter_metrics.transform_timer(): + # Read extract outputs from intermediate storage + extract_outputs = self.output_controller.read_extract_outputs(run_id) + + # Transform + transformed_meters, transformed_reads = self._transform( + run_id, extract_outputs + ) + self._base_adapter_metrics.mark_meters_transformed(len(transformed_meters)) + self._base_adapter_metrics.mark_reads_transformed(len(transformed_reads)) + + # Write transformed outputs to intermediate storage + logger.info( + f"Transformed {len(transformed_meters)} meters for org {self.org_id}" + ) + self.output_controller.write_transformed_meters(run_id, transformed_meters) + self.output_controller.write_transformed_meter_reads( + run_id, transformed_reads + ) @abstractmethod def _transform(self, run_id: str, extract_outputs: ExtractOutput): @@ -193,9 +200,22 @@ def load_raw(self, run_id: str): :run_id: identifier for this run of the pipeline, is used to find intermediate output files """ - extract_outputs = self.output_controller.read_extract_outputs(run_id) - for sink in self.storage_sinks: - sink.store_raw(run_id, extract_outputs) + with self._base_adapter_metrics.load_raw_timer(): + extract_outputs = self.output_controller.read_extract_outputs(run_id) + for sink in self.storage_sinks: + sink.store_raw(run_id, extract_outputs) + + def load_transformed(self, run_id: str): + """ + Stores transformed data from transform step into all storage sinks. + + :run_id: identifier for this run of the pipeline, is used to find intermediate output files + """ + with self._base_adapter_metrics.load_transformed_timer(): + meters = self.output_controller.read_transformed_meters(run_id) + reads = self.output_controller.read_transformed_meter_reads(run_id) + for sink in self.storage_sinks: + sink.store_transformed(run_id, meters, reads) def post_process( self, @@ -203,46 +223,42 @@ def post_process( extract_range_start: datetime, extract_range_end: datetime, ): - # Sink post processing, e.g. leak detection queries - if self.pipeline_configuration.should_run_post_processor: - logger.info("Running sink post processor") - for sink in self.storage_sinks: - sink_post_process_end_date = datetime.today() - sink_post_process_start_date = sink_post_process_end_date - timedelta( - days=30 - ) - logger.info( - f"Running post processor for sink {sink.__class__.__name__} from {sink_post_process_start_date} to {sink_post_process_end_date}" - ) - sink.exec_postprocessor( - run_id, sink_post_process_start_date, sink_post_process_end_date - ) - else: - logger.info("Skipping sink post processor as configured") - - # Publish event saying we finished loading data - if self.pipeline_configuration.should_publish_load_finished_events: - logger.info("Publishing load finished event") - event_publisher = EventPublisher() - event_publisher.publish_load_finished_event( - run_id=run_id, - org_id=self.org_id, - start_date=extract_range_start, - end_date=extract_range_end, - ) - else: - logger.info("Skipping load finished event publication as configured") - - def load_transformed(self, run_id: str): """ - Stores transformed data from transform step into all storage sinks. + Post processing step after loading data into storage sinks. Includes + sink-specific post processing, e.g. queries that run on the loaded data to refresh downstream tables. - :run_id: identifier for this run of the pipeline, is used to find intermediate output files + Also can be configured to publish an event to a message queue saying we finished loading data. """ - meters = self.output_controller.read_transformed_meters(run_id) - reads = self.output_controller.read_transformed_meter_reads(run_id) - for sink in self.storage_sinks: - sink.store_transformed(run_id, meters, reads) + with self._base_adapter_metrics.post_process_timer(): + # Sink post processing, e.g. leak detection queries + if self.pipeline_configuration.should_run_post_processor: + logger.info("Running sink post processor") + for sink in self.storage_sinks: + sink_post_process_end_date = datetime.today() + sink_post_process_start_date = ( + sink_post_process_end_date - timedelta(days=30) + ) + logger.info( + f"Running post processor for sink {sink.__class__.__name__} from {sink_post_process_start_date} to {sink_post_process_end_date}" + ) + sink.exec_postprocessor( + run_id, sink_post_process_start_date, sink_post_process_end_date + ) + else: + logger.info("Skipping sink post processor as configured") + + # Publish event saying we finished loading data + if self.pipeline_configuration.should_publish_load_finished_events: + logger.info("Publishing load finished event") + event_publisher = EventPublisher() + event_publisher.publish_load_finished_event( + run_id=run_id, + org_id=self.org_id, + start_date=extract_range_start, + end_date=extract_range_end, + ) + else: + logger.info("Skipping load finished event publication as configured") def datetime_from_iso_str( self, datetime_str: str, org_timezone: DstTzInfo @@ -433,6 +449,75 @@ def _create_storage_sinks( ) return result + class _BaseAdapterMetrics: + """ + Helper class for tracking metrics relevant to all adapters. + """ + + def __init__(self, metrics: Metrics, org_id: str, adapter_type: str): + self.metrics = metrics + self.org_id = org_id + self.adapter_type = adapter_type + + def extract_timer(self): + return self.metrics.timed_task( + "adapter.extract.duration_seconds", + tags={ + "org_id": self.org_id, + "adapter_type": self.adapter_type, + }, + ) + + def transform_timer(self): + return self.metrics.timed_task( + "adapter.transform.duration_seconds", + tags={ + "org_id": self.org_id, + "adapter_type": self.adapter_type, + }, + ) + + def mark_meters_transformed(self, count: int): + self.metrics.incr( + "adapter.transform.meters_transformed", + count, + tags={"org_id": self.org_id, "adapter_type": self.adapter_type}, + ) + + def mark_reads_transformed(self, count: int): + self.metrics.incr( + "adapter.transform.reads_transformed", + count, + tags={"org_id": self.org_id, "adapter_type": self.adapter_type}, + ) + + def load_raw_timer(self): + return self.metrics.timed_task( + "adapter.load_raw.duration_seconds", + tags={ + "org_id": self.org_id, + "adapter_type": self.adapter_type, + }, + ) + + def load_transformed_timer(self): + return self.metrics.timed_task( + "adapter.load_transformed.duration_seconds", + tags={ + "org_id": self.org_id, + "adapter_type": self.adapter_type, + }, + ) + + def post_process_timer(self): + return self.metrics.timed_task( + "adapter.post_process.duration_seconds", + tags={ + "org_id": self.org_id, + "adapter_type": self.adapter_type, + }, + ) + class GeneralMeterUnitOfMeasure: """ diff --git a/amiadapters/metrics/base.py b/amiadapters/metrics/base.py index fe805d3..5f84ea2 100644 --- a/amiadapters/metrics/base.py +++ b/amiadapters/metrics/base.py @@ -1,4 +1,6 @@ from abc import ABC, abstractmethod +from contextlib import contextmanager +import time from typing import Mapping, Optional from datetime import datetime, timezone @@ -93,6 +95,23 @@ def gauge(self, name, value, tags=None): def timing(self, name, value_seconds, tags=None): self._backend.timing(name, value_seconds, tags) + @contextmanager + def timed_task(self, name, tags): + start = time.monotonic() + try: + yield + success = True + except Exception: + success = False + raise + finally: + duration = time.monotonic() - start + self.timing( + name, + duration, + tags={**tags, "success": str(success).lower()}, + ) + class CloudWatchMetricsBackend(MetricsBackend): """