Skip to content

Commit

Permalink
fea: add datadog monitoring (#14)
Browse files Browse the repository at this point in the history
* fea: add datadog monitoring

* chore: remove prometheus and otel

* chore: handle no datadog

* chore: lint
  • Loading branch information
anaselmhamdi authored Jan 23, 2025
1 parent 9438223 commit a47c4e0
Show file tree
Hide file tree
Showing 19 changed files with 194 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ profile_default/
ipython_config.py

hubspot_credentials.json
test-pipeline.yml
test-pipeline*.yml
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"name": "Kafka pipeline",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/bizon/sources/kafka/tests/kafka_pipeline.py",
"program": "${workspaceFolder}/bizon/connectors/sources/kafka/tests/kafka_pipeline.py",
"console": "integratedTerminal"
}
]
Expand Down
6 changes: 6 additions & 0 deletions bizon/common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from bizon.connectors.destinations.file.src.config import FileDestinationConfig
from bizon.connectors.destinations.logger.src.config import LoggerConfig
from bizon.engine.config import EngineConfig
from bizon.monitoring.config import MonitoringConfig
from bizon.source.config import SourceConfig, SourceSyncModes
from bizon.transform.config import TransformModel

Expand Down Expand Up @@ -53,6 +54,11 @@ class BizonConfig(BaseModel):
default=None,
)

monitoring: Optional[MonitoringConfig] = Field(
description="Monitoring configuration",
default=None,
)


class SyncMetadata(BaseModel):
"""Model which stores general metadata around a sync.
Expand Down
4 changes: 1 addition & 3 deletions bizon/connectors/sources/kafka/tests/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,5 @@
from bizon.engine.engine import RunnerFactory

if __name__ == "__main__":
runner = RunnerFactory.create_from_yaml(
filepath=os.path.abspath("bizon/connectors/sources/kafka/config/kafka.example.yml")
)
runner = RunnerFactory.create_from_yaml(filepath=os.path.abspath("test-pipeline-monitoring.yml"))
runner.run()
14 changes: 13 additions & 1 deletion bizon/engine/pipeline/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
AbstractQueueConfig,
QueueMessage,
)
from bizon.monitoring.monitor import AbstractMonitor
from bizon.transform.transform import Transform


class AbstractQueueConsumer(ABC):
def __init__(self, config: AbstractQueueConfig, destination: AbstractDestination, transform: Transform):
def __init__(
self,
config: AbstractQueueConfig,
destination: AbstractDestination,
transform: Transform,
monitor: AbstractMonitor,
):
self.config = config
self.destination = destination
self.transform = transform
self.monitor = monitor

@abstractmethod
def run(self, stop_event: Union[multiprocessing.synchronize.Event, threading.Event]) -> PipelineReturnStatus:
Expand All @@ -35,6 +43,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt
except Exception as e:
logger.error(f"Error applying transformation: {e}")
logger.error(traceback.format_exc())
self.monitor.track_pipeline_status(PipelineReturnStatus.TRANSFORM_ERROR)
return PipelineReturnStatus.TRANSFORM_ERROR

# Handle last iteration
Expand All @@ -48,10 +57,12 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt
pagination=queue_message.pagination,
last_iteration=True,
)
self.monitor.track_pipeline_status(PipelineReturnStatus.SUCCESS)
return PipelineReturnStatus.SUCCESS

except Exception as e:
logger.error(f"Error writing records to destination: {e}")
self.monitor.track_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
return PipelineReturnStatus.DESTINATION_ERROR

# Write the records to the destination
Expand All @@ -66,6 +77,7 @@ def process_queue_message(self, queue_message: QueueMessage) -> PipelineReturnSt

except Exception as e:
logger.error(f"Error writing records to destination: {e}")
self.monitor.track_pipeline_status(PipelineReturnStatus.DESTINATION_ERROR)
return PipelineReturnStatus.DESTINATION_ERROR

raise RuntimeError("Should not reach this point")
18 changes: 15 additions & 3 deletions bizon/engine/queue/adapters/python_queue/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,37 @@
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.engine.queue.config import QueueMessage
from bizon.engine.queue.queue import AbstractQueue
from bizon.monitoring.monitor import AbstractMonitor
from bizon.transform.transform import Transform

from .config import PythonQueueConfig


class PythonQueueConsumer(AbstractQueueConsumer):
def __init__(
self, config: PythonQueueConfig, queue: AbstractQueue, destination: AbstractDestination, transform: Transform
self,
config: PythonQueueConfig,
queue: AbstractQueue,
destination: AbstractDestination,
transform: Transform,
monitor: AbstractMonitor,
):
super().__init__(config, destination=destination, transform=transform)
super().__init__(
config,
destination=destination,
transform=transform,
monitor=monitor,
)
self.queue = queue
self.monitor.track_pipeline_status(PipelineReturnStatus.RUNNING)

def run(self, stop_event: Union[threading.Event, multiprocessing.synchronize.Event]) -> PipelineReturnStatus:

while True:

# Handle kill signal from the runner
if stop_event.is_set():
logger.info("Stop event is set, closing consumer ...")
self.monitor.track_pipeline_status(PipelineReturnStatus.KILLED_BY_RUNNER)
return PipelineReturnStatus.KILLED_BY_RUNNER

# Retrieve the message from the queue
Expand Down
9 changes: 7 additions & 2 deletions bizon/engine/queue/adapters/python_queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from bizon.destination.destination import AbstractDestination
from bizon.engine.queue.config import QUEUE_TERMINATION, QueueMessage
from bizon.engine.queue.queue import AbstractQueue, AbstractQueueConsumer
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.models import SourceIteration
from bizon.transform.transform import Transform

Expand All @@ -28,8 +29,12 @@ def connect(self):
# No connection to establish for PythonQueue
pass

def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer:
return PythonQueueConsumer(config=self.config, queue=self.queue, destination=destination, transform=transform)
def get_consumer(
self, destination: AbstractDestination, transform: Transform, monitor: AbstractMonitor
) -> AbstractQueueConsumer:
return PythonQueueConsumer(
config=self.config, queue=self.queue, destination=destination, transform=transform, monitor=monitor
)

def put_queue_message(self, queue_message: QueueMessage):
if not self.queue.full():
Expand Down
11 changes: 8 additions & 3 deletions bizon/engine/queue/queue.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Union
from typing import Union

import polars as pl
from pytz import UTC

from bizon.destination.destination import AbstractDestination
from bizon.engine.pipeline.consumer import AbstractQueueConsumer
from bizon.monitoring.monitor import AbstractMonitor
from bizon.source.models import SourceIteration, source_record_schema
from bizon.transform.transform import Transform

Expand All @@ -30,7 +30,12 @@ def connect(self):
pass

@abstractmethod
def get_consumer(self, destination: AbstractDestination, transform: Transform) -> AbstractQueueConsumer:
def get_consumer(
self,
destination: AbstractDestination,
transform: Transform,
monitor: AbstractMonitor,
) -> AbstractQueueConsumer:
pass

@abstractmethod
Expand Down
13 changes: 12 additions & 1 deletion bizon/engine/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from bizon.engine.pipeline.producer import Producer
from bizon.engine.queue.queue import AbstractQueue, QueueFactory
from bizon.engine.runner.config import RunnerStatus
from bizon.monitoring.monitor import AbstractMonitor, MonitorFactory
from bizon.source.discover import get_source_instance_by_source_and_stream
from bizon.source.source import AbstractSource
from bizon.transform.transform import Transform
Expand Down Expand Up @@ -118,6 +119,11 @@ def get_transform(bizon_config: BizonConfig) -> Transform:
"""Return the transform instance to apply to the source records"""
return Transform(transforms=bizon_config.transforms)

@staticmethod
def get_monitoring_client(bizon_config: BizonConfig) -> AbstractMonitor:
"""Return the monitoring client instance"""
return MonitorFactory.get_monitor(bizon_config)

@staticmethod
def get_or_create_job(
bizon_config: BizonConfig,
Expand Down Expand Up @@ -231,8 +237,13 @@ def instanciate_and_run_consumer(
backend = AbstractRunner.get_backend(bizon_config=bizon_config, **kwargs)
destination = AbstractRunner.get_destination(bizon_config=bizon_config, backend=backend, job_id=job_id)
transform = AbstractRunner.get_transform(bizon_config=bizon_config)
monitor = AbstractRunner.get_monitoring_client(bizon_config=bizon_config)

consumer = queue.get_consumer(destination=destination, transform=transform)
consumer = queue.get_consumer(
destination=destination,
transform=transform,
monitor=monitor,
)

status = consumer.run(stop_event)
return status
Expand Down
Empty file added bizon/monitoring/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions bizon/monitoring/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from enum import Enum
from typing import Optional

from pydantic import BaseModel


class MonitorType(str, Enum):
DATADOG = "datadog"


class DatadogConfig(BaseModel):
datadog_agent_host: str
datadog_agent_port: int = 8125


class MonitoringConfig(BaseModel):
type: MonitorType
config: Optional[DatadogConfig] = None
Empty file.
43 changes: 43 additions & 0 deletions bizon/monitoring/datadog/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from datadog import initialize, statsd
from loguru import logger

from bizon.common.models import BizonConfig
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.monitoring.monitor import AbstractMonitor


class DatadogMonitor(AbstractMonitor):
def __init__(self, pipeline_config: BizonConfig):
super().__init__(pipeline_config)

# In Kubernetes, set the host dynamically
try:
initialize(
statsd_host=pipeline_config.monitoring.config.datadog_agent_host,
statsd_port=pipeline_config.monitoring.config.datadog_agent_port,
)
except Exception as e:
logger.info(f"Failed to initialize Datadog agent: {e}")

self.pipeline_monitor_status = "bizon_pipeline.status"
self.tags = [
f"name:{self.pipeline_config.name}",
f"stream:{self.pipeline_config.source.stream}",
f"source:{self.pipeline_config.source.name}",
f"destination:{self.pipeline_config.destination.name}",
]

self.pipeline_active_pipelines = "bizon_pipeline.active_pipelines"

def track_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
"""
Track the status of the pipeline.
Args:
status (str): The current status of the pipeline (e.g., 'running', 'failed', 'completed').
"""

statsd.increment(
self.pipeline_monitor_status,
tags=self.tags + [f"status:{pipeline_status}"],
)
35 changes: 35 additions & 0 deletions bizon/monitoring/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from abc import ABC, abstractmethod

from bizon.common.models import BizonConfig
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.monitoring.config import MonitorType


class AbstractMonitor(ABC):
def __init__(self, pipeline_config: BizonConfig):
self.pipeline_config = pipeline_config
# Initialize the monitor

@abstractmethod
def track_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
"""
Track the status of the pipeline.
Args:
status (str): The current status of the pipeline (e.g., 'running', 'failed', 'completed').
"""
pass


class MonitorFactory:
@staticmethod
def get_monitor(pipeline_config: BizonConfig) -> AbstractMonitor:
if pipeline_config.monitoring is None:
from bizon.monitoring.noop.monitor import NoOpMonitor

return NoOpMonitor(pipeline_config)

if pipeline_config.monitoring.type == MonitorType.DATADOG:
from bizon.monitoring.datadog.monitor import DatadogMonitor

return DatadogMonitor(pipeline_config)
Empty file.
11 changes: 11 additions & 0 deletions bizon/monitoring/noop/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from bizon.common.models import BizonConfig
from bizon.engine.pipeline.models import PipelineReturnStatus
from bizon.monitoring.monitor import AbstractMonitor


class NoOpMonitor(AbstractMonitor):
def __init__(self, pipeline_config: BizonConfig):
super().__init__(pipeline_config)

def track_pipeline_status(self, pipeline_status: PipelineReturnStatus) -> None:
pass
24 changes: 19 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ snakeviz = "^2.1.2"
yappi = "^1.3.2"
pre-commit = "^3.8.0"


[tool.poetry.group.monitoring.dependencies]
datadog = "^0.50.2"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Expand Down
Loading

0 comments on commit a47c4e0

Please sign in to comment.