Skip to content

Commit

Permalink
Add default fields duration, metrics duration
Browse files Browse the repository at this point in the history
  • Loading branch information
steersbob committed Jan 15, 2024
1 parent c68b45e commit 83d5cda
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 47 deletions.
14 changes: 9 additions & 5 deletions brewblox_history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import collections
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any, Literal, NamedTuple

from pydantic import (BaseModel, ConfigDict, Field, field_validator,
Expand Down Expand Up @@ -56,9 +56,12 @@ class ServiceConfig(BaseSettings):
history_topic: str = 'brewcast/history'
datastore_topic: str = 'brewcast/datastore'

ranges_interval: float = 10
metrics_interval: float = 10
minimum_step: float = 10
ranges_interval: timedelta = timedelta(seconds=10)
metrics_interval: timedelta = timedelta(seconds=10)
minimum_step: timedelta = timedelta(seconds=10)

query_duration_default: timedelta = timedelta(days=1)
query_desired_points: int = 1000


class HistoryEvent(BaseModel):
Expand Down Expand Up @@ -113,11 +116,12 @@ class DatastoreDeleteResponse(BaseModel):


class TimeSeriesFieldsQuery(BaseModel):
duration: str
duration: str = Field('10m', examples=['10m', '1d'])


class TimeSeriesMetricsQuery(BaseModel):
fields: list[str]
duration: str = Field('10m', examples=['10m', '1d'])


class TimeSeriesMetric(BaseModel):
Expand Down
8 changes: 4 additions & 4 deletions brewblox_history/timeseries_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def protected(desc: str):


async def _stream_ranges(ws: WebSocket, id: str, query: TimeSeriesRangesQuery):
interval = utils.get_config().ranges_interval
config = utils.get_config()
open_ended = utils.is_open_ended(start=query.start,
duration=query.duration,
end=query.end)
Expand All @@ -135,11 +135,11 @@ async def _stream_ranges(ws: WebSocket, id: str, query: TimeSeriesRangesQuery):
if not open_ended:
break

await asyncio.sleep(interval)
await asyncio.sleep(config.ranges_interval.total_seconds())


async def _stream_metrics(ws: WebSocket, id: str, query: TimeSeriesMetricsQuery):
interval = utils.get_config().metrics_interval
config = utils.get_config()

while True:
async with protected('metrics push'):
Expand All @@ -152,7 +152,7 @@ async def _stream_metrics(ws: WebSocket, id: str, query: TimeSeriesMetricsQuery)
'data': jsonable_encoder(data),
})

await asyncio.sleep(interval)
await asyncio.sleep(config.metrics_interval.total_seconds())


@router.websocket('/stream')
Expand Down
29 changes: 16 additions & 13 deletions brewblox_history/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
import ciso8601
from pytimeparse.timeparse import timeparse

from . import utils
from .models import ServiceConfig

FLAT_SEPARATOR = '/'
DESIRED_POINTS = 1000
DEFAULT_DURATION = timedelta(days=1)

LOGGER = logging.getLogger(__name__)

DurationSrc_ = str | int | float | timedelta
DatetimeSrc_ = str | int | float | datetime | None


Expand Down Expand Up @@ -50,11 +48,16 @@ def strex(ex: Exception, tb=False):
return msg


def parse_duration(value: str) -> timedelta:
def parse_duration(value: DurationSrc_) -> timedelta:
if isinstance(value, timedelta):
return value

try:
return timedelta(seconds=float(value))
total_seconds = float(value)
except ValueError:
return timedelta(seconds=timeparse(value))
total_seconds = timeparse(value)

return timedelta(seconds=total_seconds)


def parse_datetime(value: DatetimeSrc_) -> datetime | None:
Expand Down Expand Up @@ -124,9 +127,8 @@ def now() -> datetime: # pragma: no cover


def select_timeframe(start: DatetimeSrc_,
duration: str,
duration: DurationSrc_,
end: DatetimeSrc_,
min_step: timedelta,
) -> tuple[str, str, str]:
"""Calculate start, end, and step for given start, duration, and end
Expand All @@ -135,14 +137,15 @@ def select_timeframe(start: DatetimeSrc_,
`duration` is formatted as `{value}s`.
"""
config = utils.get_config()
dt_start: datetime | None = None
dt_end: datetime | None = None

if all([start, duration, end]):
raise ValueError('At most two out of three timeframe arguments can be provided')

elif not any([start, duration, end]):
dt_start = now() - DEFAULT_DURATION
dt_start = now() - config.query_duration_default
dt_end = None

elif start and duration:
Expand All @@ -167,7 +170,7 @@ def select_timeframe(start: DatetimeSrc_,

elif end:
dt_end = parse_datetime(end)
dt_start = dt_end - DEFAULT_DURATION
dt_start = dt_end - config.query_duration_default

# This path should never be reached
else: # pragma: no cover
Expand All @@ -176,8 +179,8 @@ def select_timeframe(start: DatetimeSrc_,
# Calculate optimal step interval
# We want a decent resolution without flooding the front-end with data
actual_duration: timedelta = (dt_end or now()) - dt_start
desired_step = actual_duration.total_seconds() // DESIRED_POINTS
step = int(max(desired_step, min_step.total_seconds()))
desired_step = actual_duration.total_seconds() // config.query_desired_points
step = int(max(desired_step, config.minimum_step.total_seconds()))

return (
format_datetime(dt_start, 's'),
Expand Down
10 changes: 4 additions & 6 deletions brewblox_history/victoria.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import logging
from contextvars import ContextVar
from datetime import timedelta
from urllib.parse import quote

import httpx
Expand Down Expand Up @@ -33,7 +32,6 @@ def __init__(self):
str(config.victoria_port),
config.victoria_path,
])
self._minimum_step = timedelta(seconds=config.minimum_step)
self._query_headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept-Encoding': 'gzip',
Expand Down Expand Up @@ -65,16 +63,17 @@ async def fields(self, args: TimeSeriesFieldsQuery) -> list[str]:
return retv

async def metrics(self, args: TimeSeriesMetricsQuery) -> list[TimeSeriesMetric]:
start = utils.now() - utils.parse_duration(args.duration)
return list((
v for k, v in self._cached_metrics.items()
if k in args.fields
and v.timestamp >= start
))

async def ranges(self, args: TimeSeriesRangesQuery) -> list[TimeSeriesRange]:
start, end, step = utils.select_timeframe(args.start,
args.duration,
args.end,
self._minimum_step)
args.end)
queries = [
f'query=avg_over_time({{__name__="{quote(f)}"}}[{step}])&step={step}&start={start}&end={end}'
for f in args.fields
Expand All @@ -95,8 +94,7 @@ async def ranges(self, args: TimeSeriesRangesQuery) -> list[TimeSeriesRange]:
async def csv(self, args: TimeSeriesCsvQuery):
start, end, _ = utils.select_timeframe(args.start,
args.duration,
args.end,
self._minimum_step)
args.end)
matches = '&'.join([
f'match[]={{__name__="{quote(f)}"}}'
for f in args.fields
Expand Down
7 changes: 4 additions & 3 deletions test/test_timeseries_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

import asyncio
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from time import time_ns
from unittest.mock import ANY, AsyncMock, Mock

Expand Down Expand Up @@ -168,7 +168,7 @@ async def csv_mock(args: TimeSeriesCsvQuery):


async def test_stream(client: AsyncClient, config: ServiceConfig, m_victoria: Mock):
config.ranges_interval = 0.001
config.ranges_interval = timedelta(milliseconds=1)
m_victoria.metrics.return_value = [
TimeSeriesMetric(
metric='a',
Expand Down Expand Up @@ -275,7 +275,8 @@ async def test_stream(client: AsyncClient, config: ServiceConfig, m_victoria: Mo
return_exceptions=True)


async def test_stream_error(client: AsyncClient, m_victoria: Mock):
async def test_stream_error(client: AsyncClient, config: ServiceConfig, m_victoria: Mock):
config.ranges_interval = timedelta(milliseconds=1)
dt = datetime(2021, 7, 15, 19, tzinfo=timezone.utc)
m_victoria.ranges.side_effect = RuntimeError
m_victoria.metrics.return_value = [
Expand Down
28 changes: 12 additions & 16 deletions test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def test_strex():
def test_parse_duration():
assert utils.parse_duration('2h10m') == timedelta(hours=2, minutes=10)
assert utils.parse_duration('10') == timedelta(seconds=10)
assert utils.parse_duration(timedelta(hours=1)) == timedelta(minutes=60)

with pytest.raises(TypeError):
utils.parse_duration('')
Expand Down Expand Up @@ -95,24 +96,24 @@ def fmt(dt: datetime) -> str:
return str(int(dt.timestamp()))

mocker.patch(TESTED + '.now').side_effect = now
min_step = timedelta(seconds=10)

with pytest.raises(ValueError):
utils.select_timeframe(start='yesterday',
duration='2d',
end='tomorrow',
min_step=min_step)
end='tomorrow')

assert utils.select_timeframe(None, None, None, min_step) == (
assert utils.select_timeframe(None,
None,
None
) == (
fmt(datetime(2021, 7, 14, 19)),
'',
'86s'
)

assert utils.select_timeframe(start=now(),
duration='1h',
end=None,
min_step=min_step
end=None
) == (
fmt(now()),
fmt(datetime(2021, 7, 15, 20)),
Expand All @@ -121,8 +122,7 @@ def fmt(dt: datetime) -> str:

assert utils.select_timeframe(start=now(),
duration=None,
end=datetime(2021, 7, 15, 20),
min_step=min_step
end=datetime(2021, 7, 15, 20)
) == (
fmt(now()),
fmt(datetime(2021, 7, 15, 20)),
Expand All @@ -131,8 +131,7 @@ def fmt(dt: datetime) -> str:

assert utils.select_timeframe(start=None,
duration='1h',
end=datetime(2021, 7, 15, 20),
min_step=min_step
end=datetime(2021, 7, 15, 20)
) == (
fmt(now()),
fmt(datetime(2021, 7, 15, 20)),
Expand All @@ -141,8 +140,7 @@ def fmt(dt: datetime) -> str:

assert utils.select_timeframe(start=datetime(2021, 7, 15, 18),
duration=None,
end=None,
min_step=min_step
end=None
) == (
fmt(datetime(2021, 7, 15, 18)),
'',
Expand All @@ -151,8 +149,7 @@ def fmt(dt: datetime) -> str:

assert utils.select_timeframe(start=None,
duration='1h',
end=None,
min_step=min_step
end=None
) == (
fmt(datetime(2021, 7, 15, 18)),
'',
Expand All @@ -161,8 +158,7 @@ def fmt(dt: datetime) -> str:

assert utils.select_timeframe(start=None,
duration=None,
end=now(),
min_step=min_step
end=now()
) == (
fmt(datetime(2021, 7, 14, 19)),
fmt(now()),
Expand Down

0 comments on commit 83d5cda

Please sign in to comment.