Skip to content

Commit

Permalink
fea: add datadog monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
anaselmhamdi committed Jan 22, 2025
1 parent 9438223 commit ebcedf1
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ profile_default/
ipython_config.py

hubspot_credentials.json
test-pipeline.yml
test-pipeline*.yml
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"name": "Kafka pipeline",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/bizon/sources/kafka/tests/kafka_pipeline.py",
"program": "${workspaceFolder}/bizon/connectors/sources/kafka/tests/kafka_pipeline.py",
"console": "integratedTerminal"
}
]
Expand Down
6 changes: 6 additions & 0 deletions bizon/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bizon.connectors.destinations.file.src.config import FileDestinationConfig
from bizon.connectors.destinations.logger.src.config import LoggerConfig
from bizon.engine.config import EngineConfig
from bizon.monitoring.config import MonitoringConfig
from bizon.source.config import SourceConfig, SourceSyncModes
from bizon.transform.config import TransformModel

Expand Down Expand Up @@ -53,6 +54,11 @@ class BizonConfig(BaseModel):
default=None,
)

monitoring: Optional[MonitoringConfig] = Field(
description="Monitoring configuration",
default=None,
)


class SyncMetadata(BaseModel):
"""Model which stores general metadata around a sync.
Expand Down
4 changes: 1 addition & 3 deletions bizon/connectors/sources/kafka/tests/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,5 @@
from bizon.engine.engine import RunnerFactory

if __name__ == "__main__":
runner = RunnerFactory.create_from_yaml(
filepath=os.path.abspath("bizon/connectors/sources/kafka/config/kafka.example.yml")
)
runner = RunnerFactory.create_from_yaml(filepath=os.path.abspath("test-pipeline-monitoring.yml"))
runner.run()
15 changes: 14 additions & 1 deletion bizon/engine/pipeline/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
AbstractQueueConfig,
QueueMessage,
)
from bizon.monitoring.client import Monitor
from bizon.transform.transform import Transform


class AbstractQueueConsumer(ABC):
def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination, transform: Transform):
def __init__(
self,
config: AbstractQueueConfig,
destination: AbstractDestination,
transform: Transform,
monitor: Monitor,
):
self.config = config
self.destination = destination
self.transform = transform
self.monitor = monitor

@abstractmethod
def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus:
Expand All @@ -35,6 +43,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt
except Exception as e:
logger.error(f"Error applying transformation: {e}")
logger.error(traceback.format_exc())
self.monitor.report_pipeline_status(PipelineReturnStatus.TRANSFORM_ERROR)
return PipelineReturnStatus.TRANSFORM_ERROR

# Handle last iteration
Expand All @@ -48,13 +57,16 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt
pagination=queue_message.pagination,
last_iteration=True,
)
self.monitor.report_pipeline_status(PipelineReturnStatus.SUCCESS)
return PipelineReturnStatus.SUCCESS

except Exception as e:
logger.error(f"Error writing records to destination: {e}")
self.monitor.report_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
return PipelineReturnStatus.DESTINATION_ERROR

# Write the records to the destination
self.monitor.report_pipeline_status(PipelineReturnStatus.RUNNING)
try:
self.destination.write_records_and_update_cursor(
df_source_records=df_source_records,
Expand All @@ -66,6 +78,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt

except Exception as e:
logger.error(f"Error writing records to destination: {e}")
self.monitor.report_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
return PipelineReturnStatus.DESTINATION_ERROR

raise RuntimeError("Should not reach this point")
16 changes: 14 additions & 2 deletions bizon/engine/queue/adapters/python_queue/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.engine.queue.config import QueueMessage
from bizon.engine.queue.queue import AbstractQueue
from bizon.monitoring.client import Monitor
from bizon.transform.transform import Transform

from .config import PythonQueueConfig


class PythonQueueConsumer(AbstractQueueConsumer):
def __init__(
self, config: PythonQueueConfig, queue: AbstractQueue, destination: AbstractDestination, transform: Transform
self,
config: PythonQueueConfig,
queue: AbstractQueue,
destination: AbstractDestination,
transform: Transform,
monitor: Monitor,
):
super().__init__(config, destination=destination, transform=transform)
super().__init__(
config,
destination=destination,
transform=transform,
monitor=monitor,
)
self.queue = queue

def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Event]) -> PipelineReturnStatus:
Expand All @@ -29,6 +40,7 @@ def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Eve
# Handle kill signal from the runner
if stop_event.is_set():
logger.info("Stop event is set, closing consumer ...")
self.monitor.report_pipeline_status(PipelineReturnStatus.KILLED_BY_RUNNER)
return PipelineReturnStatus.KILLED_BY_RUNNER

# Retrieve the message from the queue
Expand Down
9 changes: 7 additions & 2 deletions bizon/engine/queue/adapters/python_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from bizon.destination.destination import AbstractDestination
from bizon.engine.queue.config import QUEUE_TERMINATION, QueueMessage
from bizon.engine.queue.queue import AbstractQueue, AbstractQueueConsumer
from bizon.monitoring.client import Monitor
from bizon.source.models import SourceIteration
from bizon.transform.transform import Transform

Expand All @@ -28,8 +29,12 @@ def connect(self):
# No connection to establish for PythonQueue
pass

def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer:
return PythonQueueConsumer(config=self.config, queue=self.queue, destination=destination, transform=transform)
def get_consumer(
self, destination: AbstractDestination, transform: Transform, monitor: Monitor
) -> AbstractQueueConsumer:
return PythonQueueConsumer(
config=self.config, queue=self.queue, destination=destination, transform=transform, monitor=monitor
)

def put_queue_message(self, queue_message: QueueMessage):
if not self.queue.full():
Expand Down
8 changes: 5 additions & 3 deletions bizon/engine/queue/queue.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Union
from typing import Union

import polars as pl
from pytz import UTC

from bizon.destination.destination import AbstractDestination
from bizon.engine.pipeline.consumer import AbstractQueueConsumer
from bizon.monitoring.client import Monitor
from bizon.source.models import SourceIteration, source_record_schema
from bizon.transform.transform import Transform

Expand All @@ -30,7 +30,9 @@ def connect(self):
pass

@abstractmethod
def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer:
def get_consumer(
self, destination: AbstractDestination, transform: Transform, monitor: Monitor
) -> AbstractQueueConsumer:
pass

@abstractmethod
Expand Down
13 changes: 12 additions & 1 deletion bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from bizon.engine.pipeline.producer import Producer
from bizon.engine.queue.queue import AbstractQueue, QueueFactory
from bizon.engine.runner.config import RunnerStatus
from bizon.monitoring.client import Monitor
from bizon.source.discover import get_source_instance_by_source_and_stream
from bizon.source.source import AbstractSource
from bizon.transform.transform import Transform
Expand Down Expand Up @@ -118,6 +119,11 @@ def get_transform(bizon_config: BizonConfig) -> Transform:
"""Return the transform instance to apply to the source records"""
return Transform(transforms=bizon_config.transforms)

@staticmethod
def get_monitoring_client(bizon_config: BizonConfig) -> Monitor:
"""Return the monitoring client instance"""
return Monitor(bizon_config)

@staticmethod
def get_or_create_job(
bizon_config: BizonConfig,
Expand Down Expand Up @@ -231,8 +237,13 @@ def instanciate_and_run_consumer(
backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs)
destination = AbstractRunner.get_destination(bizon_config=bizon_config, backend=backend, job_id=job_id)
transform = AbstractRunner.get_transform(bizon_config=bizon_config)
monitor = AbstractRunner.get_monitoring_client(bizon_config=bizon_config)

consumer = queue.get_consumer(destination=destination, transform=transform)
consumer = queue.get_consumer(
destination=destination,
transform=transform,
monitor=monitor,
)

status = consumer.run(stop_event)
return status
Expand Down
Empty file added bizon/monitoring/__init__.py
Empty file.
38 changes: 38 additions & 0 deletions bizon/monitoring/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from datadog import initialize, statsd

from bizon.common.models import BizonConfig
from bizon.engine.pipeline.models import PipelineReturnStatus


class Monitor:
def __init__(self, pipeline_config: BizonConfig):
self.pipeline_config = pipeline_config

# In Kubernetes, set the host dynamically
initialize(
statsd_host=pipeline_config.monitoring.datadog_agent_host,
statsd_port=pipeline_config.monitoring.datadog_agent_port,
)

self.pipeline_monitor_status = "bizon_pipeline.status"
self.tags = [
f"name:{self.pipeline_config.name}",
f"stream:{self.pipeline_config.source.stream}",
f"source:{self.pipeline_config.source.name}",
f"destination:{self.pipeline_config.destination.name}",
]

self.pipeline_active_pipelines = "bizon_pipeline.active_pipelines"

def report_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
"""
Track the status of the pipeline.
Args:
status (str): The current status of the pipeline (e.g., 'running', 'failed', 'completed').
"""

statsd.increment(
self.pipeline_monitor_status,
tags=self.tags + [f"status:{pipeline_status}"],
)
6 changes: 6 additions & 0 deletions bizon/monitoring/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pydantic import BaseModel


class MonitoringConfig(BaseModel):
datadog_agent_host: str
datadog_agent_port: int = 8125
24 changes: 19 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ snakeviz = "^2.1.2"
yappi = "^1.3.2"
pre-commit = "^3.8.0"


[tool.poetry.group.monitoring.dependencies]
datadog = "^0.50.2"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Expand Down
22 changes: 20 additions & 2 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.9'

services:
db:
image: postgres
Expand All @@ -24,3 +22,23 @@ services:
restart: always
ports:
- 8080:8080

otel-collector:
image: otel/opentelemetry-collector-contrib:0.117.0
volumes:
- ../bizon/monitoring/otel-collector-config.yml:/config.yml
command: --config /config.yml
ports:
- "1888:1888" # pprof extension
- "8888:8888" # Prometheus metrics exposed by the collector
- "8889:8889" # Prometheus exporter metrics
- "13133:13133" # health_check extension
- "4317:4317" # OTLP gRPC receiver

prometheus:
image: prom/prometheus
ports:
- 9090:9090
volumes:
- ../bizon/monitoring/prometheus.yml:/etc/prometheus.yml
command: --config.file=/etc/prometheus.yml

0 comments on commit ebcedf1

Please sign in to comment.