Skip to content

Commit

Permalink
implement mqtt and httpx support
Browse files Browse the repository at this point in the history
  • Loading branch information
steersbob committed Nov 27, 2023
1 parent d4a1adf commit 3300764
Show file tree
Hide file tree
Showing 14 changed files with 351 additions and 292 deletions.
92 changes: 34 additions & 58 deletions brewblox_history/app.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,54 @@
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from pprint import pformat

from fastapi import FastAPI

from . import datastore_api, redis, settings
from . import datastore_api, mqtt, redis, relays, victoria
from .models import ServiceConfig

LOGGER = settings.brewblox_logger(__name__)
LOGGER = logging.getLogger(__name__)


def init_logging():
config = ServiceConfig.cached()
level = logging.DEBUG if config.debug else logging.INFO
unimportant_level = logging.INFO if config.debug else logging.WARN
format = '%(asctime)s.%(msecs)03d [%(levelname).1s:%(name)s:%(lineno)d] %(message)s'
datefmt = '%Y/%m/%d %H:%M:%S'

logging.basicConfig(level=level, format=format, datefmt=datefmt)
logging.captureWarnings(True)

logging.getLogger('gmqtt').setLevel(unimportant_level)
logging.getLogger('httpx').setLevel(unimportant_level)
logging.getLogger('httpcore').setLevel(logging.WARN)


def setup():
mqtt.setup()
redis.setup()
victoria.setup()
relays.setup()


@asynccontextmanager
async def lifespan(app: FastAPI):
LOGGER.info(settings.get_config())
LOGGER.debug('\n' + pformat(app.routes))
LOGGER.info(ServiceConfig.cached())
LOGGER.debug('ROUTES:\n' + pformat(app.routes))
# LOGGER.debug('LOGGERS:\n' + pformat(logging.root.manager.loggerDict))

async with AsyncExitStack() as stack:
await stack.enter_async_context(mqtt.lifespan())
await stack.enter_async_context(redis.lifespan())
yield


def create_app():
settings.get_config.cache_clear()
settings.init_logging()
config = settings.get_config()

redis.client.set(redis.RedisClient())
init_logging()
setup()

config = ServiceConfig.cached()
prefix = f'/{config.name}'
app = FastAPI(lifespan=lifespan,
docs_url=f'{prefix}/api/doc',
Expand All @@ -33,52 +58,3 @@ def create_app():
app.include_router(datastore_api.router, prefix=prefix)

return app

# def create_parser():
# parser = service.create_parser('history')
# parser.add_argument('--ranges-interval',
# help='Interval (sec) between updates in live ranges. [%(default)s]',
# default=10,
# type=float)
# parser.add_argument('--metrics-interval',
# help='Interval (sec) between updates in live metrics. [%(default)s]',
# default=5,
# type=float)
# parser.add_argument('--redis-url',
# help='URL for the Redis database',
# default='redis://redis')
# parser.add_argument('--victoria-url',
# help='URL for the Victoria Metrics database',
# default='http://victoria:8428/victoria')
# parser.add_argument('--datastore-topic',
# help='Synchronization topic for datastore updates',
# default='brewcast/datastore')
# parser.add_argument('--minimum-step',
# help='Minimum period (sec) for range data downsampling',
# default=10,
# type=float)
# return parser


# def main():
# parser = create_parser()
# config = service.create_config(parser, model=ServiceConfig)
# app = service.create_app(config)

# async def setup():
# scheduler.setup(app)
# http.setup(app)
# mqtt.setup(app)
# socket_closer.setup(app)
# victoria.setup(app)
# timeseries_api.setup(app)
# redis.setup(app)
# datastore_api.setup(app)
# relays.setup(app)
# error_response.setup(app)

# service.run_app(app, setup())


# if __name__ == '__main__':
# main()
19 changes: 10 additions & 9 deletions brewblox_history/datastore_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
REST endpoints for datastore queries
"""

import logging

from fastapi import APIRouter, Response

from . import redis
from .models import (DatastoreDeleteResponse, DatastoreMultiQuery,
DatastoreMultiValueBox, DatastoreOptSingleValueBox,
DatastoreSingleQuery, DatastoreSingleValueBox)
from .settings import brewblox_logger

LOGGER = brewblox_logger(__name__)
LOGGER = logging.getLogger(__name__)

router = APIRouter(prefix='/datastore', tags=['Datastore'])

Expand All @@ -23,7 +24,7 @@ async def ping(response: Response):
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate, proxy-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
await redis.client.get().ping()
await redis.CV.get().ping()
return {'ping': 'pong'}


Expand All @@ -32,7 +33,7 @@ async def datastore_get(args: DatastoreSingleQuery) -> DatastoreOptSingleValueBo
"""
Get a specific object from the datastore.
"""
value = await redis.client.get().get(args.namespace, args.id)
value = await redis.CV.get().get(args.namespace, args.id)
return DatastoreOptSingleValueBox(value=value)


Expand All @@ -41,7 +42,7 @@ async def datastore_mget(args: DatastoreMultiQuery) -> DatastoreMultiQuery:
"""
Get multiple objects from the datastore.
"""
values = await redis.client.get().mget(args.namespace, args.ids, args.filter)
values = await redis.CV.get().mget(args.namespace, args.ids, args.filter)
return DatastoreMultiValueBox(values=values)


Expand All @@ -50,7 +51,7 @@ async def datastore_set(args: DatastoreOptSingleValueBox) -> DatastoreSingleValu
"""
Create or update an object in the datastore.
"""
value = await redis.client.get().set(args.value)
value = await redis.CV.get().set(args.value)
return DatastoreSingleValueBox(value=value)


Expand All @@ -59,7 +60,7 @@ async def datastore_mset(args: DatastoreMultiValueBox) -> DatastoreMultiValueBox
"""
Create or update multiple objects in the datastore.
"""
values = await redis.client.get().mset(args.values)
values = await redis.CV.get().mset(args.values)
return DatastoreMultiValueBox(values=values)


Expand All @@ -68,7 +69,7 @@ async def datastore_delete(args: DatastoreSingleQuery) -> DatastoreDeleteRespons
"""
Remove a single object from the datastore.
"""
count = await redis.client.get().delete(args.namespace, args.id)
count = await redis.CV.get().delete(args.namespace, args.id)
return DatastoreDeleteResponse(count=count)


Expand All @@ -77,5 +78,5 @@ async def datastore_mdelete(args: DatastoreMultiQuery) -> DatastoreDeleteRespons
"""
Remove multiple objects from the datastore.
"""
count = await redis.client.get().mdelete(args.namespace, args.ids, args.filter)
count = await redis.CV.get().mdelete(args.namespace, args.ids, args.filter)
return DatastoreDeleteResponse(count=count)
13 changes: 11 additions & 2 deletions brewblox_history/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import collections
from datetime import datetime
from typing import Any, Literal, NamedTuple
from functools import lru_cache
from typing import Any, Literal, NamedTuple, Self

from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
Expand Down Expand Up @@ -47,6 +48,11 @@ class ServiceConfig(BaseSettings):
metrics_interval: float = 10
minimum_step: float = 10

@classmethod
@lru_cache
def cached(cls) -> Self:
return cls()


class HistoryEvent(BaseModel):
model_config = ConfigDict(extra='ignore')
Expand All @@ -58,7 +64,10 @@ class HistoryEvent(BaseModel):
@classmethod
def flatten_data(cls, v):
assert isinstance(v, dict)
return flatten(v)
assert 'key' in v
assert 'data' in v
data = flatten(v['data'])
return {'key': v['key'], 'data': data}


class DatastoreValue(BaseModel):
Expand Down
21 changes: 21 additions & 0 deletions brewblox_history/mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from contextlib import asynccontextmanager
from contextvars import ContextVar

from fastapi_mqtt.config import MQTTConfig
from fastapi_mqtt.fastmqtt import FastMQTT

CV: ContextVar[FastMQTT] = ContextVar('FastMQTT')


def setup():
mqtt_config = MQTTConfig(host='eventbus')
fast_mqtt = FastMQTT(config=mqtt_config)
CV.set(fast_mqtt)


@asynccontextmanager
async def lifespan():
fast_mqtt = CV.get()
await fast_mqtt.connection()
yield
await fast_mqtt.client.disconnect()
32 changes: 10 additions & 22 deletions brewblox_history/redis.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
# import json
import logging
from contextlib import asynccontextmanager
from contextvars import ContextVar
from functools import wraps
from itertools import groupby
from typing import Optional

from redis import asyncio as aioredis

from .models import DatastoreValue
from .settings import brewblox_logger, get_config
from .models import DatastoreValue, ServiceConfig

LOGGER = brewblox_logger(__name__)
LOGGER = logging.getLogger(__name__)

client: ContextVar['RedisClient'] = ContextVar('redis_client')
CV: ContextVar['RedisClient'] = ContextVar('RedisClient')


def keycat(namespace: str, key: str) -> str:
Expand All @@ -36,8 +35,7 @@ async def wrapper(self: 'RedisClient', *args, **kwargs):
class RedisClient:

def __init__(self):
# super().__init__(app)
config = get_config()
config = ServiceConfig.cached()
self.url = config.redis_url
self.topic = config.datastore_topic
# Lazy-loaded in autoconnect wrapper
Expand All @@ -47,7 +45,7 @@ async def disconnect(self):
if self._redis:
await self._redis.close()

async def _mkeys(self, namespace: str, ids: Optional[list[str]], filter: Optional[str]) -> list[str]:
async def _mkeys(self, namespace: str, ids: list[str] | None, filter: str | None) -> list[str]:
keys = [keycat(namespace, key) for key in (ids or [])]
if filter is not None:
keys += [key.decode()
Expand Down Expand Up @@ -83,7 +81,7 @@ async def ping(self):
await self._redis.ping()

@autoconnect
async def get(self, namespace: str, id: str) -> Optional[DatastoreValue]:
async def get(self, namespace: str, id: str) -> DatastoreValue | None:
resp = await self._redis.get(keycat(namespace, id))
return DatastoreValue.parse_raw(resp) if resp else None

Expand Down Expand Up @@ -131,21 +129,11 @@ async def mdelete(self, namespace: str, ids: list[str] = None, filter: str = Non
return count


# def setup(app: web.Application):
# features.add(app, RedisClient(app))


# def fget(app: web.Application) -> RedisClient:
# return features.get(app, RedisClient)
def setup():
CV.set(RedisClient())


@asynccontextmanager
async def lifespan():
# impl = RedisClient()
# token = client.set(impl)
# yield
# client.reset(token)
# await impl.disconnect()
yield
await client.get().disconnect()
LOGGER.info('goodbye')
await CV.get().disconnect()
Loading

0 comments on commit 3300764

Please sign in to comment.