Skip to content

Commit

Permalink
allow customizable collection interval for collection and sharded dat…
Browse files Browse the repository at this point in the history
…a metrics
  • Loading branch information
lu-zhengda committed Nov 20, 2024
1 parent 7b18212 commit 8d2bc1e
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 5 deletions.
25 changes: 25 additions & 0 deletions mongo/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,31 @@ files:
type: string
example:
[metrics.commands, tcmalloc, top, collection, jumbo_chunks, sharded_data_distribution]
- name: metrics_collection_interval
description: |
The interval in seconds at which to collect certain types of metrics.
options:
- name: collection
description: |
The interval in seconds at which to collect collection metrics.
Only applicable when `collection` is added to `additional_metrics`.
value:
type: integer
example: 15
- name: collections_indexes_stats
description: |
The interval in seconds at which to collect collection indexes stats metrics.
Only applicable when `collections_indexes_stats` is set to `true`.
value:
type: integer
example: 15
- name: sharded_data_distribution
description: |
The interval in seconds at which to collect sharded data distribution metrics.
Only applicable when `sharded_data_distribution` is added to `additional_metrics`.
value:
type: integer
example: 300
- name: collections
description: |
Collect metrics on specific collections from the database specified
Expand Down
38 changes: 38 additions & 0 deletions mongo/datadog_checks/mongo/collectors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Licensed under a 3-clause BSD style license (see LICENSE)

import re
import time
from functools import wraps

from datadog_checks.base import AgentCheck
from datadog_checks.mongo.metrics import CASE_SENSITIVE_METRIC_NAME_SUFFIXES
Expand All @@ -23,6 +25,7 @@ def __init__(self, check, tags):
self.gauge = self.check.gauge
self.base_tags = tags
self.metrics_to_collect = self.check.metrics_to_collect
self._collector_key = (self.__class__.__name__,)

def collect(self, api):
"""The main method exposed by the collector classes, needs to be implemented by every subclass.
Expand Down Expand Up @@ -126,3 +129,38 @@ def _submit_payload(self, payload, additional_tags=None, metrics_to_collect=None
# Keep old incorrect metric name
# 'top' and 'index', 'collectionscans' metrics are affected
self.gauge(metric_name_alias[:-2], value, tags=tags)

def get_last_collection_timestamp(self):
return self.check.metrics_last_collection_timestamp.get(self._collector_key)

def set_last_collection_timestamp(self, timestamp):
self.check.metrics_last_collection_timestamp[self._collector_key] = timestamp


def collection_interval_checker(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
current_time = time.time()
# Check if _collection_interval and _last_collection_timestamp exist
# If not, run the function to collect the metrics
if not hasattr(self, '_collection_interval'):
self.set_last_collection_timestamp(current_time)
return func(self, *args, **kwargs)
# If _collection_interval not set or set to the check default, call the function to collect the metrics
if (
self._collection_interval is None
or self._collection_interval <= 0 # Ensure the interval is valid
or self._collection_interval == self.check._config.min_collection_interval # Check default
):
self.set_last_collection_timestamp(current_time)
return func(self, *args, **kwargs)

# Check if enough time has passed since the last collection
last_collection_timestamp = self.get_last_collection_timestamp()
if not last_collection_timestamp or current_time - last_collection_timestamp >= self._collection_interval:
self.set_last_collection_timestamp(current_time)
return func(self, *args, **kwargs)
else:
self.log.debug("%s skipped: collection interval not reached yet.", self.__class__.__name__)

return wrapper
8 changes: 7 additions & 1 deletion mongo/datadog_checks/mongo/collectors/coll_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pymongo.errors import OperationFailure

from datadog_checks.base import AgentCheck
from datadog_checks.mongo.collectors.base import MongoCollector
from datadog_checks.mongo.collectors.base import MongoCollector, collection_interval_checker
from datadog_checks.mongo.metrics import COLLECTION_METRICS


Expand All @@ -20,6 +20,11 @@ def __init__(self, check, db_name, tags, coll_names=None):
self.db_name = db_name
self.max_collections_per_database = check._config.database_autodiscovery_config['max_collections_per_database']
self.coll_stats_pipeline_supported = True
self._collection_interval = check._config.metrics_collection_interval['collection']
self._collector_key = (
self.__class__.__name__,
db_name,
) # db_name is part of collector key

def compatible_with(self, deployment):
# Can only be run once per cluster.
Expand Down Expand Up @@ -55,6 +60,7 @@ def _get_collection_stats(self, api, coll_name):
self.coll_stats_pipeline_supported = False
return [api.coll_stats_compatable(self.db_name, coll_name)]

@collection_interval_checker
def collect(self, api):
coll_names = self._get_collections(api)
for coll_name in coll_names:
Expand Down
5 changes: 4 additions & 1 deletion mongo/datadog_checks/mongo/collectors/index_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pymongo.errors import OperationFailure

from datadog_checks.mongo.collectors.base import MongoCollector
from datadog_checks.mongo.collectors.base import MongoCollector, collection_interval_checker
from datadog_checks.mongo.metrics import INDEX_METRICS


Expand All @@ -16,6 +16,8 @@ def __init__(self, check, db_name, tags, coll_names=None):
self.coll_names = coll_names
self.db_name = db_name
self.max_collections_per_database = check._config.database_autodiscovery_config['max_collections_per_database']
self._collection_interval = check._config.metrics_collection_interval['collections_indexes_stats']
self._collector_key = (self.__class__.__name__, db_name) # db_name is part of collector key

def compatible_with(self, deployment):
# Can only be run once per cluster.
Expand All @@ -26,6 +28,7 @@ def _get_collections(self, api):
return self.coll_names
return api.list_authorized_collections(self.db_name, limit=self.max_collections_per_database)

@collection_interval_checker
def collect(self, api):
coll_names = self._get_collections(api)
for coll_name in coll_names:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from datadog_checks.mongo.collectors.base import MongoCollector
from datadog_checks.mongo.collectors.base import MongoCollector, collection_interval_checker
from datadog_checks.mongo.common import MongosDeployment
from datadog_checks.mongo.metrics import SHARDED_DATA_DISTRIBUTION_METRICS

Expand All @@ -14,11 +14,13 @@ class ShardedDataDistributionStatsCollector(MongoCollector):

def __init__(self, check, tags):
super(ShardedDataDistributionStatsCollector, self).__init__(check, tags)
self._collection_interval = check._config.metrics_collection_interval['sharded_data_distribution']

def compatible_with(self, deployment):
# Can only be run on mongos nodes.
return isinstance(deployment, MongosDeployment)

@collection_interval_checker
def collect(self, api):
for distribution in api.sharded_data_distribution_stats():
ns = distribution['ns']
Expand Down
17 changes: 17 additions & 0 deletions mongo/datadog_checks/mongo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def __init__(self, instance, log, init_config):
self.collections_indexes_stats = is_affirmative(instance.get('collections_indexes_stats'))
self.coll_names = instance.get('collections', [])
self.custom_queries = instance.get("custom_queries", [])
self._metrics_collection_interval = instance.get("metrics_collection_interval", {})

self._base_tags = list(set(instance.get('tags', [])))

Expand Down Expand Up @@ -256,3 +257,19 @@ def _get_database_autodiscovery_config(self, instance):
database_autodiscovery_config.get("max_collections_per_database", 100)
)
return database_autodiscovery_config

@property
def metrics_collection_interval(self):
'''
metrics collection interval is used to customize how often to collect different types of metrics
by default, metrics are collected on every check run with default interval of 15 seconds
'''
return {
# $collStats and $indexStats are collected on every check run but they can get expensive on large databases
'collection': self._metrics_collection_interval.get('collection', self.min_collection_interval),
'collections_indexes_stats': self._metrics_collection_interval.get(
'collections_indexes_stats', self.min_collection_interval
),
# $shardDataDistribution stats are collected every 5 minutes by default due to the high resource usage
'sharded_data_distribution': self._metrics_collection_interval.get('sharded_data_distribution', 300),
}
1 change: 1 addition & 0 deletions mongo/datadog_checks/mongo/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __init__(self, name, init_config, instances=None):
self.metrics_to_collect = self._build_metric_list_to_collect()
self.collectors = []
self.last_states_by_server = {}
self.metrics_last_collection_timestamp = {}

self.deployment_type = None
self._mongo_version = None
Expand Down
13 changes: 13 additions & 0 deletions mongo/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ def test_integration_mongos(instance_integration_cluster, aggregator, check, dd_
cluster_name='my_cluster',
modules=['enterprise'],
)
# run the check again to verify sharded data distribution metrics are NOT collected
# because the collection interval is not reached
aggregator.reset()
with mock_pymongo("mongos"):
dd_run_check(mongos_check)

assert_metrics(
mongos_check,
aggregator,
['sharded-data-distribution'],
['sharding_cluster_role:mongos', 'clustername:my_cluster', 'hosting_type:self-hosted'],
count=0,
)


def test_integration_replicaset_primary_in_shard(instance_integration, aggregator, check, dd_run_check):
Expand Down
4 changes: 2 additions & 2 deletions mongo/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .common import HERE


def assert_metrics(check_instance, aggregator, metrics_categories, additional_tags=None):
def assert_metrics(check_instance, aggregator, metrics_categories, additional_tags=None, count=1):
if additional_tags is None:
additional_tags = []
for cat in metrics_categories:
Expand All @@ -17,7 +17,7 @@ def assert_metrics(check_instance, aggregator, metrics_categories, additional_ta
aggregator.assert_metric(
metric['name'],
value=metric['value'],
count=1,
count=count,
tags=additional_tags + metric['tags'] + check_instance.internal_resource_tags,
metric_type=metric['type'],
)
Expand Down

0 comments on commit 8d2bc1e

Please sign in to comment.