Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 76 additions & 114 deletions drow/parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Union, TypeVar
from collections.abc import Callable
from typing import Union, TypeVar, Generic

from .annotation import (
SuccessResponse, ErrorResponse,
Expand Down Expand Up @@ -43,147 +42,110 @@ class ParseError(Exception):
pass


def generic_parse_query_response(
resp: QueryResponse,
value_converter: Converter[T],
) -> QueryResult[T]:
if resp["status"] == "error":
parse_error(resp)
class BaseParser(Generic[T]):
def parse_value(self, value: str) -> T:
raise NotImplementedError

assert resp["status"] == "success", resp
def parse_query_response(self, resp: QueryResponse) -> QueryResult[T]:
if resp["status"] == "error":
self.parse_error(resp)

data = resp["data"]
assert resp["status"] == "success", resp

if data["resultType"] == "string":
return parse_string(data)
data = resp["data"]

if data["resultType"] == "scalar":
return parse_scalar(data, value_converter=value_converter)
if data["resultType"] == "string":
return self.parse_string(data)

if data["resultType"] == "vector":
return parse_vector(data, value_converter=value_converter)
if data["resultType"] == "scalar":
return self.parse_scalar(data)

raise ParseError(f'unknown result type: {data["resultType"]}')
if data["resultType"] == "vector":
return self.parse_vector(data)

raise ParseError(f'unknown result type: {data["resultType"]}')

def parse_error(resp: ErrorResponse) -> None:
raise PrometheusError(
f'error {resp["errorType"]}: {resp["error"]}'
)
def parse_error(self, resp: ErrorResponse) -> None:
raise PrometheusError(
f'error {resp["errorType"]}: {resp["error"]}'
)

def parse_instant_series(
self, data: ScalarInstantVector,
) -> InstantSeries[T]:
return InstantSeries(
metric=data["metric"],
value=self.parse_scalar_point(data["value"]),
)

def parse_instant_series(
data: ScalarInstantVector,
value_converter: Converter[T],
) -> InstantSeries[T]:
return InstantSeries(
metric=data["metric"],
# value=ScalarPoint(*data["value"])
value=parse_scalar_point(
data["value"], value_converter=value_converter,
),
)
def parse_range_series(self, data: ScalarRangeVector) -> RangeSeries[T]:
return RangeSeries(
metric=data["metric"],
values=[self.parse_scalar_point(i) for i in data["values"]]
)

def parse_query_range_response(
self, resp: QueryRangeResponse,
) -> QueryRangeResult[T]:
if resp["status"] == "error":
self.parse_error(resp)

def parse_range_series(
data: ScalarRangeVector,
value_converter: Converter[T]
) -> RangeSeries[T]:
return RangeSeries(
metric=data["metric"],
values=[
# ScalarPoint(*i)
parse_scalar_point(i, value_converter=value_converter)
for i in data["values"]
]
)
assert resp["status"] == "success", resp

data = resp["data"]
assert data["resultType"] == "matrix", resp

def generic_parse_query_range_response(
resp: QueryRangeResponse,
value_converter: Converter[T],
) -> QueryRangeResult[T]:
if resp["status"] == "error":
parse_error(resp)
return self.parse_matrix(data)

assert resp["status"] == "success", resp
def parse_vector(self, data: VectorData) -> InstantVector[T]:
return InstantVector(series=[
self.parse_instant_series(i) for i in data["result"]
])

data = resp["data"]
assert data["resultType"] == "matrix", resp
def parse_matrix(self, data: MatrixData) -> Matrix[T]:
return Matrix(series=[
self.parse_range_series(i) for i in data["result"]
])

return parse_matrix(data, value_converter=value_converter)
def parse_scalar(self, data: ScalarData) -> ScalarPoint[T]:
return self.parse_scalar_point(data["result"])

def parse_scalar_point(self, data: ScalarPointData) -> ScalarPoint[T]:
t, v = data
return ScalarPoint(t, self.parse_value(v))

def parse_vector(
data: VectorData,
value_converter: Converter[T],
) -> InstantVector[T]:
return InstantVector(series=[
parse_instant_series(i, value_converter=value_converter)
for i in data["result"]
])
def parse_string(self, data: StringData) -> StringPoint:
return StringPoint(*data["result"])

def parse_query_value_response(self, resp: QueryResponse) -> T:
if resp["status"] == "error":
self.parse_error(resp)

def parse_matrix(data: MatrixData, value_converter: Converter[T]) -> Matrix[T]:
return Matrix(series=[
parse_range_series(i, value_converter=value_converter)
for i in data["result"]
])
assert resp["status"] == "success", resp

data = resp["data"]

def parse_scalar(
data: ScalarData, value_converter: Converter[T],
) -> ScalarPoint[T]:
return parse_scalar_point(data["result"], value_converter)
if data["resultType"] == "string":
return self.parse_value(data["result"][1])

if data["resultType"] == "scalar":
return self.parse_value(data["result"][1])

def parse_scalar_point(
data: ScalarPointData,
value_converter: Converter[T],
) -> ScalarPoint[T]:
t, v = data
return ScalarPoint(t, value_converter(v))
if data["resultType"] == "vector":
series_count = len(data["result"])
if series_count != 1:
raise ParseError(f"series count incorrect: {series_count}")

return self.parse_value(data["result"][0]["value"][1])

def parse_string(data: StringData) -> StringPoint:
return StringPoint(*data["result"])


def generic_parse_query_value_response(
resp: QueryResponse,
value_converter: Converter[T],
) -> T:
if resp["status"] == "error":
parse_error(resp)

assert resp["status"] == "success", resp

data = resp["data"]

if data["resultType"] == "string":
return value_converter(data["result"][1])

if data["resultType"] == "scalar":
return value_converter(data["result"][1])

if data["resultType"] == "vector":
series_count = len(data["result"])
if series_count != 1:
raise ParseError(f"series count incorrect: {series_count}")

return value_converter(data["result"][0]["value"][1])

raise ParseError(f'unknown result type: {data["resultType"]}')
raise ParseError(f'unknown result type: {data["resultType"]}')


def make_parser(
origin_parser: Callable[
[ResponseType, Converter[ValueType]],
ResultType
],
value_converter: Converter[ValueType],
) -> Callable[[ResponseType], ResultType]:
def parser(resp: ResponseType) -> ResultType:
return origin_parser(resp, value_converter)
) -> BaseParser[ValueType]:
class Parser(BaseParser[ValueType]):
def parse_value(self, value: str) -> ValueType:
return value_converter(value)

return parser
return Parser()
5 changes: 3 additions & 2 deletions tests/test_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from drow.annotation import SuccessResponse, VectorData
from drow.model import InstantVector
from drow.parser import generic_parse_query_response
from drow.parser import make_parser
from drow.converter import convert_to_decimal


Expand All @@ -23,7 +23,8 @@ def test_convert_to_decimal(self) -> None:
],
},
}
parsed = generic_parse_query_response(resp, convert_to_decimal)
parser = make_parser(convert_to_decimal)
parsed = parser.parse_query_response(resp)
assert isinstance(parsed, InstantVector)
value = parsed.series[0].value.value

Expand Down
29 changes: 9 additions & 20 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,13 @@
InstantVector, Matrix,
)
from drow.parser import (
generic_parse_query_response,
generic_parse_query_range_response,
generic_parse_query_value_response,
PrometheusError,
ParseError,
make_parser,
)
from drow.converter import no_op

parse_query_response = make_parser(
generic_parse_query_response, no_op
)
parse_query_range_response = make_parser(
generic_parse_query_range_response, no_op
)
parse_query_value_response = make_parser(
generic_parse_query_value_response, no_op
)
parser = make_parser(no_op)


class TestParser(TestCase):
Expand All @@ -51,7 +40,7 @@ def test_success_vector(self) -> None:
],
},
}
parsed = parse_query_response(resp)
parsed = parser.parse_query_response(resp)
assert isinstance(parsed, InstantVector)
self.assertEqual(len(parsed.series), 2)

Expand Down Expand Up @@ -92,7 +81,7 @@ def test_success_matrix(self) -> None:
],
},
}
parsed = parse_query_range_response(resp)
parsed = parser.parse_query_range_response(resp)
assert isinstance(parsed, Matrix)
self.assertEqual(len(parsed.series), 2)

Expand All @@ -113,7 +102,7 @@ def test_success_scalar(self) -> None:
"status": "success",
"data": {"resultType": "scalar", "result": (1739529069.829, "5")},
}
parsed = parse_query_response(resp)
parsed = parser.parse_query_response(resp)
assert isinstance(parsed, ScalarPoint)
self.assertEqual(parsed.value, "5")

Expand All @@ -124,7 +113,7 @@ def test_success_string(self) -> None:
"resultType": "string", "result": (1739529105.401, "foo")
},
}
parsed = parse_query_response(resp)
parsed = parser.parse_query_response(resp)
assert isinstance(parsed, StringPoint)
self.assertEqual(parsed.value, "foo")

Expand All @@ -136,14 +125,14 @@ def test_error(self) -> None:
}

with self.assertRaises(PrometheusError):
parse_query_response(resp)
parser.parse_query_response(resp)

def test_scalar_value(self) -> None:
resp: SuccessResponse[ScalarData] = {
"status": "success",
"data": {"resultType": "scalar", "result": (1739529069.829, "5")},
}
self.assertEqual(parse_query_value_response(resp), "5")
self.assertEqual(parser.parse_query_value_response(resp), "5")

def test_vector_value(self) -> None:
resp: SuccessResponse[VectorData] = {
Expand All @@ -160,7 +149,7 @@ def test_vector_value(self) -> None:
],
},
}
self.assertEqual(parse_query_value_response(resp), "6")
self.assertEqual(parser.parse_query_value_response(resp), "6")

def test_too_many_series_when_parse_value(self) -> None:
resp: SuccessResponse[VectorData] = {
Expand All @@ -184,4 +173,4 @@ def test_too_many_series_when_parse_value(self) -> None:
},
}
with self.assertRaises(ParseError):
parse_query_value_response(resp)
parser.parse_query_value_response(resp)