Skip to content

Commit

Permalink
Merge branch 'improved-metrics'
Browse files Browse the repository at this point in the history
  • Loading branch information
alastair committed May 3, 2021
2 parents fde1981 + cdb8676 commit 58ed2aa
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 275 deletions.
219 changes: 49 additions & 170 deletions brainzutils/metrics.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import datetime
from functools import wraps

from redis import ResponseError
import os
import socket
from time import time_ns
from typing import Dict

from brainzutils import cache

NAMESPACE_METRICS = "metrics"
REDIS_MAX_INTEGER = 2**63-1
RESERVED_TAG_NAMES = {"tag", "date"}
STATS_COUNT_DATE_KEY = "date"

REDIS_METRICS_KEY = "metrics:influx_data"
_metrics_project_name = None


Expand All @@ -29,172 +26,54 @@ def decorated(*args, **kwargs):

@cache.init_required
@metrics_init_required
def increment(metric_name, amount=1):
"""Increment the counter for a metric by a set amount. A metric is a counter that can increment over time
and can be used for monitoring any statistic.
If incrementing the counter causes it to go over redis' internal counter limit of 2**63-1, the counter
is reset to 0. The metric name ``tag`` is reserved and cannot be used.
Arguments:
metric_name: the name of a metric
amount: the amount to increase the counter by, must be 1 or greater (default: 1)
Raises:
ValueError if amount is less than 1 or greater than 2**63-1
ValueError if the reserved metric name ``tag`` is used
def set(metric_name: str, tags: Dict[str, str] = None, timestamp: int = None, **fields):
"""
Submit a metric to be read by the MetaBrainz influx datastore for graphing/monitoring
purposes. These metrics are stored in redis in the influxdb line protocol format:
https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/
Args:
metric_name: The name of the metric to record.
tags: Additional influx tags to write with the metric. (optional)
timestamp: A nanosecond timestamp to use for this metric. If not provided
the current time is used.
fields: The key, value pairs to store with this metric.
"""

if amount < 1:
raise ValueError("amount must be 1 or greater")
if amount > REDIS_MAX_INTEGER:
raise ValueError("amount is too large")
if metric_name in RESERVED_TAG_NAMES:
raise ValueError("the name '{}' is reserved".format(metric_name))

# Add types to influx data
try:
ret = cache.hincrby(_metrics_project_name, metric_name, amount, namespace=NAMESPACE_METRICS)
except ResponseError as e:
# If the current value is too large, redis will return this error message.
# Reset to 0 and re-increment
if e.args and "increment or decrement would overflow" in e.args[0]:
cache.hset(_metrics_project_name, metric_name, 0, namespace=NAMESPACE_METRICS)
ret = cache.hincrby(_metrics_project_name, metric_name, amount, namespace=NAMESPACE_METRICS)
host = os.environ['PRIVATE_IP']
except KeyError:
host = socket.gethostname()

if tags is None:
tags = {}

tags["dc"] = "hetzner"
tags["server"] = host
tags["project"] = _metrics_project_name
tag_string = ",".join([ "%s=%s" % (k, v) for k, v in tags.items() ])

fields_list = []
for k, v in fields.items():
if type(v) == int:
fields_list.append("%s=%di" % (k, v))
elif type(v) == bool:
val = "t" if v else "f"
fields_list.append("%s=%s" % (k, val))
elif type(fields[k]) == str:
fields_list.append('%s="%s"' % (k, v))
else:
raise

return ret


@cache.init_required
@metrics_init_required
def remove(metric_name):
"""Remove a metric from the local counter. When a metric is removed it will no longer
appear in the data returned by :meth:`stats`
Arguments:
metric_name: The metric to remove
"""

return cache.hdel(_metrics_project_name, [metric_name], namespace=NAMESPACE_METRICS)


@cache.init_required
@metrics_init_required
def stats():
"""Get the current value for metrics in the currently configured project.
This can be used in a flask view to return the current metrics::
@bp.route('/metric_statistics')
def increment_metric():
return jsonify(metrics.stats())
The view can be read by telegraf or any other logging system.
Returns:
A dictionary containing metric names and counts for the current project, as well
as a field ``tag`` containing the current project name. For example::
{"new_users": 100,
"computed_stats": 20,
"tag": "listenbrainz.org"}
"""

counters = cache.hgetall(_metrics_project_name, namespace=NAMESPACE_METRICS)
ret = {str(k): int(v) for k, v in counters.items()}
ret["tag"] = _metrics_project_name
return ret
fields_list.append("%s=%s" % (k, str(v)))

fields = " ".join(fields_list)

@cache.init_required
@metrics_init_required
def set_count(metric_name, **data):
"""Set fixed counter for a given metric name. This allows you
to log the result of a given computation that happens periodically.
For example, if you have an import process that happens periodically,
you could call something like::
metrics.set_count('import', artists=10, releases=27, recordings=100)
to set some fixed counts for the ``import`` metric. These metrics are
stored with the time that the method is called.
Unlike incrementing statistics, a specific number cannot be incremented.
To set new values for a subsequent iteration of the process, call it again.
The statistics for a given metric can be retrieved with :meth:`stats_count`.
Calling ``set_count`` with the same metric name but different data will cause all
previous data values to be cleared. An entire metric can be removed with
:meth:`remove_count`.
"""
for k, v in data.items():
if k in RESERVED_TAG_NAMES:
raise ValueError("the name '{}' is reserved".format(k))
try:
int(v)
except ValueError:
raise ValueError("Argument values must be integers")
metric_key = _metrics_project_name + ":" + metric_name
# Override all values for this key by deleting it if it already exists
cache.delete(metric_key, namespace=NAMESPACE_METRICS)
for k, v in data.items():
cache.hset(metric_key, k, v, namespace=NAMESPACE_METRICS)
now = datetime.datetime.now()
now = now.replace(microsecond=0)
cache.hset(metric_key, "date", now.isoformat(), namespace=NAMESPACE_METRICS)


@cache.init_required
@metrics_init_required
def remove_count(metric_name):
"""Remove fixed counters for a specific metric.
This will remove all counters for the given metric name from local storage.
Arguments:
metric_name: The metric to delete
"""
metric_key = _metrics_project_name + ":" + metric_name
return cache.delete(metric_key, namespace=NAMESPACE_METRICS)


@cache.init_required
@metrics_init_required
def stats_count(metric_name):
"""Get the fixed counters for a given metric in the currently configured project.
This can be used in a flask view to return the current metrics::
@bp.route('/metric_counts/<metric_name>')
def increment_metric(metric_name):
return jsonify(metrics.stats_count(metric_name))
if timestamp is None:
timestamp = time_ns()

The view can be read by telegraf or any other logging system.
Returns:
A dictionary containing fixed counters for the given metric, as well
as a field ``tag`` containing the current project name, a field ``metric`` containing
the requested metric, and ``date`` containing the date that the metric was last written.
For example::
{"artists": 10,
"releases": 29,
"recordings": 100,
"date": "2021-02-17T13:02:18",
"metric": "import",
"tag": "listenbrainz.org"}
"""

metric_key = _metrics_project_name + ":" + metric_name
counters = cache.hgetall(metric_key, namespace=NAMESPACE_METRICS)
ret = {}
for k, v in counters.items():
k = str(k)
if k == STATS_COUNT_DATE_KEY:
ret[k] = v.decode('utf-8')
else:
ret[k] = int(v)
ret["metric"] = metric_name
ret["tag"] = _metrics_project_name
return ret
metric = "%s,%s %s %d" % (metric_name, tag_string, fields, timestamp)
try:
cache._r.rpush(REDIS_METRICS_KEY, metric)
except Exception:
# If we fail to push the metric to redis, so be it.
pass
112 changes: 7 additions & 105 deletions brainzutils/test/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
from unittest import mock, TestCase
from freezegun import freeze_time
from redis import ResponseError

from brainzutils import cache
from brainzutils import metrics
Expand All @@ -14,107 +13,10 @@ def setUp(self):
def tearDown(self):
metrics._metrics_project_name = None

@mock.patch('brainzutils.metrics.cache.hincrby')
def test_increment(self, hincrby):
@mock.patch('brainzutils.metrics.cache._r.rpush')
def test_set(self, rpush):
metrics.init('listenbrainz.org')
metrics.increment('test_m', 2)
hincrby.assert_called_with('listenbrainz.org', 'test_m', 2, namespace='metrics')

@mock.patch('brainzutils.metrics.cache.hincrby')
def test_increment_default(self, hincrby):
metrics.init('listenbrainz.org')
metrics.increment('test_m')
hincrby.assert_called_with('listenbrainz.org', 'test_m', 1, namespace='metrics')

def test_increment_negative(self):
metrics.init('listenbrainz.org')
with self.assertRaises(ValueError):
metrics.increment('test_m', -2)

def test_increment_badname(self):
metrics.init('listenbrainz.org')
with self.assertRaises(ValueError):
metrics.increment('tag')

def test_increment_noinit(self):
with self.assertRaises(RuntimeError):
metrics.increment('test_m')

@mock.patch('brainzutils.metrics.cache.hincrby')
@mock.patch('brainzutils.metrics.cache.hset')
def test_increment_overflow(self, hset, hincrby):
hincrby.side_effect = [ResponseError("increment or decrement would overflow"), 10]
metrics.init('listenbrainz.org')
metrics.increment('test_m', 10)

hincrby.assert_has_calls([mock.call('listenbrainz.org', 'test_m', 10, namespace='metrics'),
mock.call('listenbrainz.org', 'test_m', 10, namespace='metrics')])
hset.assert_called_with('listenbrainz.org', 'test_m', 0, namespace='metrics')

@mock.patch('brainzutils.metrics.cache.hdel')
def test_remove(self, hdel):
metrics.init('listenbrainz.org')
metrics.remove('test_m')
hdel.assert_called_with('listenbrainz.org', ['test_m'], namespace='metrics')

@mock.patch('brainzutils.metrics.cache.hgetall')
def test_stats(self, hgetall):
metrics.init('listenbrainz.org')
hgetall.return_value = {'valueone': b'1',
'valuetwo': b'20',
'somethingelse': b'8'}

stats = metrics.stats()
hgetall.assert_called_with('listenbrainz.org', namespace='metrics')

expected = {'valueone': 1,
'valuetwo': 20,
'somethingelse': 8,
'tag': 'listenbrainz.org'}

self.assertEqual(stats, expected)

@freeze_time('2021-02-15T10:22:00')
@mock.patch('brainzutils.metrics.cache.hset')
@mock.patch('brainzutils.metrics.cache.delete')
def test_set_count(self, mock_del, hset):
metrics.init('listenbrainz.org')
metrics.set_count('dataimport', artists=10, recordings=2)

mock_del.assert_called_with('listenbrainz.org:dataimport', namespace='metrics')
hset.assert_has_calls([mock.call('listenbrainz.org:dataimport', 'artists', 10, namespace='metrics'),
mock.call('listenbrainz.org:dataimport', 'recordings', 2, namespace='metrics'),
mock.call('listenbrainz.org:dataimport', 'date', '2021-02-15T10:22:00', namespace='metrics')],
any_order=True)

def test_set_count_invalid_values(self):
metrics.init('listenbrainz.org')
with self.assertRaises(ValueError):
metrics.set_count('dataimport', date=1)

with self.assertRaises(ValueError):
metrics.set_count('dataimport', artists='not-an-int')

@mock.patch('brainzutils.metrics.cache.delete')
def test_remove_count(self, mock_del):
metrics.init('listenbrainz.org')
metrics.remove_count('dataimport')
mock_del.assert_called_with('listenbrainz.org:dataimport', namespace='metrics')

@mock.patch('brainzutils.metrics.cache.hgetall')
def test_stats_count(self, hgetall):
metrics.init('listenbrainz.org')
hgetall.return_value = {'valueone': b'1',
'valuetwo': b'20',
'date': b'2021-02-12T13:02:18'}

stats = metrics.stats_count('dataimport')
hgetall.assert_called_with('listenbrainz.org:dataimport', namespace='metrics')

expected = {'valueone': 1,
'valuetwo': 20,
'date': '2021-02-12T13:02:18',
'metric': 'dataimport',
'tag': 'listenbrainz.org'}

self.assertEqual(stats, expected)
os.environ["PRIVATE_IP"] = "127.0.0.1"
metrics.set("my_metric", timestamp=1619629462352960742, test_i=2, test_fl=.3, test_t=True, test_f=False, test_s="gobble")
rpush.assert_called_with(metrics.REDIS_METRICS_KEY,
'my_metric,dc=hetzner,server=127.0.0.1,project=listenbrainz.org test_i=2i test_fl=0.3 test_t=t test_f=f test_s="gobble" 1619629462352960742')

0 comments on commit 58ed2aa

Please sign in to comment.