Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions drow/converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from decimal import Decimal
from typing import TypeVar
from collections.abc import Callable

T = TypeVar("T")
Converter = Callable[[str], T]


def no_op(value: T) -> T:
return value


def convert_to_float(value: str) -> float:
return float(value)


def convert_to_decimal(value: str) -> Decimal:
return Decimal(value)
23 changes: 13 additions & 10 deletions drow/model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from dataclasses import dataclass
from typing import TypeVar, Generic

T = TypeVar("T")


@dataclass(frozen=True)
class ScalarPoint:
class ScalarPoint(Generic[T]):
timestamp: float
value: str
value: T


@dataclass(frozen=True)
Expand All @@ -14,22 +17,22 @@ class StringPoint:


@dataclass(frozen=True)
class InstantSeries:
class InstantSeries(Generic[T]):
metric: dict[str, str]
value: ScalarPoint
value: ScalarPoint[T]


@dataclass(frozen=True)
class InstantVector:
series: list[InstantSeries]
class InstantVector(Generic[T]):
series: list[InstantSeries[T]]


@dataclass(frozen=True)
class RangeSeries:
class RangeSeries(Generic[T]):
metric: dict[str, str]
values: list[ScalarPoint]
values: list[ScalarPoint[T]]


@dataclass(frozen=True)
class Matrix:
series: list[RangeSeries]
class Matrix(Generic[T]):
series: list[RangeSeries[T]]
96 changes: 74 additions & 22 deletions drow/parser.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
from typing import Union
from typing import Union, TypeVar
from collections.abc import Callable

from .annotation import (
SuccessResponse, ErrorResponse,
ScalarInstantVector, ScalarRangeVector,
ScalarPointData,
VectorData, MatrixData, ScalarData, StringData,
)
from .model import (
ScalarPoint, StringPoint,
InstantSeries, RangeSeries,
InstantVector, Matrix,
)
from .converter import Converter

T = TypeVar("T")
ValueType = TypeVar("ValueType")
ResponseType = TypeVar("ResponseType")
ResultType = TypeVar("ResultType")

QueryResponse = Union[
SuccessResponse[ScalarData],
SuccessResponse[StringData],
SuccessResponse[VectorData],
ErrorResponse,
]
QueryResult = Union[
ScalarPoint, StringPoint, InstantVector,
type QueryResult[T] = Union[
ScalarPoint[T], StringPoint, InstantVector[T],
]
QueryRangeResponse = Union[
SuccessResponse[MatrixData],
Expand All @@ -36,7 +43,10 @@ class ParseError(Exception):
pass


def parse_query_response(resp: QueryResponse) -> QueryResult:
def generic_parse_query_response(
resp: QueryResponse,
value_converter: Converter[T],
) -> QueryResult[T]:
if resp["status"] == "error":
parse_error(resp)

Expand All @@ -48,10 +58,10 @@ def parse_query_response(resp: QueryResponse) -> QueryResult:
return parse_string(data)

if data["resultType"] == "scalar":
return parse_scalar(data)
return parse_scalar(data, value_converter=value_converter)

if data["resultType"] == "vector":
return parse_vector(data)
return parse_vector(data, value_converter=value_converter)

raise ParseError(f'unknown result type: {data["resultType"]}')

Expand All @@ -62,24 +72,37 @@ def parse_error(resp: ErrorResponse) -> None:
)


def parse_instant_series(data: ScalarInstantVector) -> InstantSeries:
def parse_instant_series(
data: ScalarInstantVector,
value_converter: Converter[T],
) -> InstantSeries[T]:
return InstantSeries(
metric=data["metric"],
value=ScalarPoint(*data["value"])
# value=ScalarPoint(*data["value"])
value=parse_scalar_point(
data["value"], value_converter=value_converter,
),
)


def parse_range_series(data: ScalarRangeVector) -> RangeSeries:
def parse_range_series(
data: ScalarRangeVector,
value_converter: Converter[T]
) -> RangeSeries[T]:
return RangeSeries(
metric=data["metric"],
values=[
ScalarPoint(*i)
# ScalarPoint(*i)
parse_scalar_point(i, value_converter=value_converter)
for i in data["values"]
]
)


def parse_query_range_response(resp: QueryRangeResponse) -> QueryRangeResult:
def generic_parse_query_range_response(
resp: QueryRangeResponse,
value_converter: Converter[T],
) -> QueryRangeResult[T]:
if resp["status"] == "error":
parse_error(resp)

Expand All @@ -88,32 +111,48 @@ def parse_query_range_response(resp: QueryRangeResponse) -> QueryRangeResult:
data = resp["data"]
assert data["resultType"] == "matrix", resp

return parse_matrix(data)
return parse_matrix(data, value_converter=value_converter)


def parse_vector(data: VectorData) -> InstantVector:
def parse_vector(
data: VectorData,
value_converter: Converter[T],
) -> InstantVector[T]:
return InstantVector(series=[
parse_instant_series(i)
parse_instant_series(i, value_converter=value_converter)
for i in data["result"]
])


def parse_matrix(data: MatrixData) -> Matrix:
def parse_matrix(data: MatrixData, value_converter: Converter[T]) -> Matrix[T]:
return Matrix(series=[
parse_range_series(i)
parse_range_series(i, value_converter=value_converter)
for i in data["result"]
])


def parse_scalar(data: ScalarData) -> ScalarPoint:
return ScalarPoint(*data["result"])
def parse_scalar(
data: ScalarData, value_converter: Converter[T],
) -> ScalarPoint[T]:
return parse_scalar_point(data["result"], value_converter)


def parse_scalar_point(
data: ScalarPointData,
value_converter: Converter[T],
) -> ScalarPoint[T]:
t, v = data
return ScalarPoint(t, value_converter(v))


def parse_string(data: StringData) -> StringPoint:
return StringPoint(*data["result"])


def parse_query_value_response(resp: QueryResponse) -> str:
def generic_parse_query_value_response(
resp: QueryResponse,
value_converter: Converter[T],
) -> T:
if resp["status"] == "error":
parse_error(resp)

Expand All @@ -122,16 +161,29 @@ def parse_query_value_response(resp: QueryResponse) -> str:
data = resp["data"]

if data["resultType"] == "string":
return data["result"][1]
return value_converter(data["result"][1])

if data["resultType"] == "scalar":
return data["result"][1]
return value_converter(data["result"][1])

if data["resultType"] == "vector":
series_count = len(data["result"])
if series_count != 1:
raise ParseError(f"series count incorrect: {series_count}")

return data["result"][0]["value"][1]
return value_converter(data["result"][0]["value"][1])

raise ParseError(f'unknown result type: {data["resultType"]}')


def make_parser(
origin_parser: Callable[
[ResponseType, Converter[ValueType]],
ResultType
],
value_converter: Converter[ValueType],
) -> Callable[[ResponseType], ResultType]:
def parser(resp: ResponseType) -> ResultType:
return origin_parser(resp, value_converter)

return parser
31 changes: 31 additions & 0 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from decimal import Decimal
from unittest import TestCase

from drow.annotation import SuccessResponse, VectorData
from drow.model import InstantVector
from drow.parser import generic_parse_query_response
from drow.converter import convert_to_decimal


class TestConverter(TestCase):
def test_convert_to_decimal(self) -> None:
resp: SuccessResponse[VectorData] = {
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"job": "foo",
},
"value": (1435781451.781, "1.23456789"),
},
],
},
}
parsed = generic_parse_query_response(resp, convert_to_decimal)
assert isinstance(parsed, InstantVector)
value = parsed.series[0].value.value

assert isinstance(value, Decimal)
self.assertEqual(value, Decimal("1.23456789"))
18 changes: 15 additions & 3 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@
InstantVector, Matrix,
)
from drow.parser import (
parse_query_response,
parse_query_range_response,
parse_query_value_response,
generic_parse_query_response,
generic_parse_query_range_response,
generic_parse_query_value_response,
PrometheusError,
ParseError,
make_parser,
)
from drow.converter import no_op

parse_query_response = make_parser(
generic_parse_query_response, no_op
)
parse_query_range_response = make_parser(
generic_parse_query_range_response, no_op
)
parse_query_value_response = make_parser(
generic_parse_query_value_response, no_op
)


Expand Down