Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 150 additions & 65 deletions amiadapters/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(
configured_task_output_controller, org_id
)
self.metrics = Metrics.from_configuration(configured_metrics)
self._base_adapter_metrics = self._BaseAdapterMetrics(
self.metrics, self.org_id, type(self).__name__
)
self.storage_sinks = self._create_storage_sinks(
configured_sinks,
self.org_id,
Expand Down Expand Up @@ -99,13 +102,16 @@ def extract_and_output(
"""
Public function for extract stage.
"""
logger.info(
f"Extracting data for range {extract_range_start} to {extract_range_end}"
)
# Use adapter implementation to extract data
extracted_output = self._extract(run_id, extract_range_start, extract_range_end)
# Output to intermediate storage, e.g. S3 or local files
self.output_controller.write_extract_outputs(run_id, extracted_output)
with self._base_adapter_metrics.extract_timer():
logger.info(
f"Extracting data for range {extract_range_start} to {extract_range_end}"
)
# Use adapter implementation to extract data
extracted_output = self._extract(
run_id, extract_range_start, extract_range_end
)
# Output to intermediate storage, e.g. S3 or local files
self.output_controller.write_extract_outputs(run_id, extracted_output)

@abstractmethod
def _extract(
Expand All @@ -128,24 +134,25 @@ def transform_and_output(self, run_id: str):
"""
Public function for transform stage.
"""
extract_outputs = self.output_controller.read_extract_outputs(run_id)
transformed_meters, transformed_reads = self._transform(run_id, extract_outputs)
# TODO clean this up
self.metrics.incr(
"transform.meters_transformed",
len(transformed_meters),
tags={"org_id": self.org_id},
)
self.metrics.incr(
"transform.meter_reads_transformed",
len(transformed_reads),
tags={"org_id": self.org_id},
)
logger.info(
f"Transformed {len(transformed_meters)} meters for org {self.org_id}"
)
self.output_controller.write_transformed_meters(run_id, transformed_meters)
self.output_controller.write_transformed_meter_reads(run_id, transformed_reads)
with self._base_adapter_metrics.transform_timer():
# Read extract outputs from intermediate storage
extract_outputs = self.output_controller.read_extract_outputs(run_id)

# Transform
transformed_meters, transformed_reads = self._transform(
run_id, extract_outputs
)
self._base_adapter_metrics.mark_meters_transformed(len(transformed_meters))
self._base_adapter_metrics.mark_reads_transformed(len(transformed_reads))

# Write transformed outputs to intermediate storage
logger.info(
f"Transformed {len(transformed_meters)} meters for org {self.org_id}"
)
self.output_controller.write_transformed_meters(run_id, transformed_meters)
self.output_controller.write_transformed_meter_reads(
run_id, transformed_reads
)

@abstractmethod
def _transform(self, run_id: str, extract_outputs: ExtractOutput):
Expand Down Expand Up @@ -193,56 +200,65 @@ def load_raw(self, run_id: str):

:run_id: identifier for this run of the pipeline, is used to find intermediate output files
"""
extract_outputs = self.output_controller.read_extract_outputs(run_id)
for sink in self.storage_sinks:
sink.store_raw(run_id, extract_outputs)
with self._base_adapter_metrics.load_raw_timer():
extract_outputs = self.output_controller.read_extract_outputs(run_id)
for sink in self.storage_sinks:
sink.store_raw(run_id, extract_outputs)

def load_transformed(self, run_id: str):
"""
Stores transformed data from transform step into all storage sinks.

:run_id: identifier for this run of the pipeline, is used to find intermediate output files
"""
with self._base_adapter_metrics.load_transformed_timer():
meters = self.output_controller.read_transformed_meters(run_id)
reads = self.output_controller.read_transformed_meter_reads(run_id)
for sink in self.storage_sinks:
sink.store_transformed(run_id, meters, reads)

def post_process(
self,
run_id: str,
extract_range_start: datetime,
extract_range_end: datetime,
):
# Sink post processing, e.g. leak detection queries
if self.pipeline_configuration.should_run_post_processor:
logger.info("Running sink post processor")
for sink in self.storage_sinks:
sink_post_process_end_date = datetime.today()
sink_post_process_start_date = sink_post_process_end_date - timedelta(
days=30
)
logger.info(
f"Running post processor for sink {sink.__class__.__name__} from {sink_post_process_start_date} to {sink_post_process_end_date}"
)
sink.exec_postprocessor(
run_id, sink_post_process_start_date, sink_post_process_end_date
)
else:
logger.info("Skipping sink post processor as configured")

# Publish event saying we finished loading data
if self.pipeline_configuration.should_publish_load_finished_events:
logger.info("Publishing load finished event")
event_publisher = EventPublisher()
event_publisher.publish_load_finished_event(
run_id=run_id,
org_id=self.org_id,
start_date=extract_range_start,
end_date=extract_range_end,
)
else:
logger.info("Skipping load finished event publication as configured")

def load_transformed(self, run_id: str):
"""
Stores transformed data from transform step into all storage sinks.
Post processing step after loading data into storage sinks. Includes
sink-specific post processing, e.g. queries that run on the loaded data to refresh downstream tables.

:run_id: identifier for this run of the pipeline, is used to find intermediate output files
Also can be configured to publish an event to a message queue saying we finished loading data.
"""
meters = self.output_controller.read_transformed_meters(run_id)
reads = self.output_controller.read_transformed_meter_reads(run_id)
for sink in self.storage_sinks:
sink.store_transformed(run_id, meters, reads)
with self._base_adapter_metrics.post_process_timer():
# Sink post processing, e.g. leak detection queries
if self.pipeline_configuration.should_run_post_processor:
logger.info("Running sink post processor")
for sink in self.storage_sinks:
sink_post_process_end_date = datetime.today()
sink_post_process_start_date = (
sink_post_process_end_date - timedelta(days=30)
)
logger.info(
f"Running post processor for sink {sink.__class__.__name__} from {sink_post_process_start_date} to {sink_post_process_end_date}"
)
sink.exec_postprocessor(
run_id, sink_post_process_start_date, sink_post_process_end_date
)
else:
logger.info("Skipping sink post processor as configured")

# Publish event saying we finished loading data
if self.pipeline_configuration.should_publish_load_finished_events:
logger.info("Publishing load finished event")
event_publisher = EventPublisher()
event_publisher.publish_load_finished_event(
run_id=run_id,
org_id=self.org_id,
start_date=extract_range_start,
end_date=extract_range_end,
)
else:
logger.info("Skipping load finished event publication as configured")

def datetime_from_iso_str(
self, datetime_str: str, org_timezone: DstTzInfo
Expand Down Expand Up @@ -433,6 +449,75 @@ def _create_storage_sinks(
)
return result

class _BaseAdapterMetrics:
"""
Helper class for tracking metrics relevant to all adapters.
"""

def __init__(self, metrics: Metrics, org_id: str, adapter_type: str):
self.metrics = metrics
self.org_id = org_id
self.adapter_type = adapter_type

def extract_timer(self):
return self.metrics.timed_task(
"adapter.extract.duration_seconds",
tags={
"org_id": self.org_id,
"adapter_type": self.adapter_type,
},
)

def transform_timer(self):
return self.metrics.timed_task(
"adapter.transform.duration_seconds",
tags={
"org_id": self.org_id,
"adapter_type": self.adapter_type,
},
)

def mark_meters_transformed(self, count: int):
self.metrics.incr(
"adapter.transform.meters_transformed",
count,
tags={"org_id": self.org_id, "adapter_type": self.adapter_type},
)

def mark_reads_transformed(self, count: int):
self.metrics.incr(
"adapter.transform.reads_transformed",
count,
tags={"org_id": self.org_id, "adapter_type": self.adapter_type},
)

def load_raw_timer(self):
return self.metrics.timed_task(
"adapter.load_raw.duration_seconds",
tags={
"org_id": self.org_id,
"adapter_type": self.adapter_type,
},
)

def load_transformed_timer(self):
return self.metrics.timed_task(
"adapter.load_transformed.duration_seconds",
tags={
"org_id": self.org_id,
"adapter_type": self.adapter_type,
},
)

def post_process_timer(self):
return self.metrics.timed_task(
"adapter.post_process.duration_seconds",
tags={
"org_id": self.org_id,
"adapter_type": self.adapter_type,
},
)


class GeneralMeterUnitOfMeasure:
"""
Expand Down
19 changes: 19 additions & 0 deletions amiadapters/metrics/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
import time
from typing import Mapping, Optional
from datetime import datetime, timezone

Expand Down Expand Up @@ -93,6 +95,23 @@ def gauge(self, name, value, tags=None):
def timing(self, name, value_seconds, tags=None):
self._backend.timing(name, value_seconds, tags)

@contextmanager
def timed_task(self, name, tags):
start = time.monotonic()
try:
yield
success = True
except Exception:
success = False
raise
finally:
duration = time.monotonic() - start
self.timing(
name,
duration,
tags={**tags, "success": str(success).lower()},
)


class CloudWatchMetricsBackend(MetricsBackend):
"""
Expand Down