Skip to content

Commit 1d1d608

Browse files
authored
Merge pull request #26 from MeteoSwiss-APN/feature/dynamodb
Implement dynamodb based request and metric stores
2 parents cea8d9d + 5a076a3 commit 1d1d608

File tree

7 files changed

+577
-2
lines changed

7 files changed

+577
-2
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#
2+
# Copyright 2024 European Centre for Medium-Range Weather Forecasts (ECMWF)
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
# In applying this licence, ECMWF does not waive the privileges and immunities
17+
# granted to it by virtue of its status as an intergovernmental organisation nor
18+
# does it submit to any jurisdiction.
19+
#
20+
21+
import logging
22+
import operator
23+
import warnings
24+
from decimal import Decimal
25+
from enum import Enum
26+
from functools import reduce
27+
28+
import boto3
29+
import botocore
30+
import botocore.exceptions
31+
from boto3.dynamodb.conditions import Attr, Key
32+
33+
from ..metric import (
34+
CacheInfo,
35+
Metric,
36+
MetricType,
37+
QueueInfo,
38+
RequestStatusChange,
39+
StorageInfo,
40+
WorkerInfo,
41+
WorkerStatusChange,
42+
)
43+
from . import MetricStore
44+
45+
logger = logging.getLogger(__name__)
46+
47+
48+
METRIC_TYPE_CLASS_MAP = {
49+
MetricType.WORKER_STATUS_CHANGE: WorkerStatusChange,
50+
MetricType.WORKER_INFO: WorkerInfo,
51+
MetricType.REQUEST_STATUS_CHANGE: RequestStatusChange,
52+
MetricType.STORAGE_INFO: StorageInfo,
53+
MetricType.CACHE_INFO: CacheInfo,
54+
MetricType.QUEUE_INFO: QueueInfo,
55+
}
56+
57+
58+
def _iter_items(fn, **params):
59+
while True:
60+
response = fn(**params)
61+
for item in response["Items"]:
62+
yield item
63+
if "LastEvaluatedKey" not in response:
64+
break
65+
params["ExclusiveStartKey"] = response["LastEvaluatedKey"]
66+
67+
68+
def _make_query(**kwargs):
69+
return {
70+
key: value.value if isinstance(value, Enum) else value for key, value in kwargs.items() if value is not None
71+
}
72+
73+
74+
def _visit(obj, fn):
75+
if isinstance(obj, dict):
76+
return {key: _visit(value, fn) for key, value in obj.items()}
77+
if isinstance(obj, list):
78+
return [_visit(value, fn) for value in obj]
79+
return fn(obj)
80+
81+
82+
def _convert_numbers(obj, reverse=False):
83+
def fn(item):
84+
if not reverse and isinstance(item, float):
85+
return Decimal(item)
86+
elif reverse and isinstance(item, Decimal):
87+
return float(item)
88+
return item
89+
90+
return _visit(obj, fn)
91+
92+
93+
def _load(item):
94+
metric_type = Metric.deserialize_slot("type", item["type"])
95+
cls = METRIC_TYPE_CLASS_MAP[metric_type]
96+
return cls(from_dict=_convert_numbers(item, reverse=True))
97+
98+
99+
def _dump(metric):
100+
item = _convert_numbers(metric.serialize())
101+
if "request_id" in item and item["request_id"] is None:
102+
del item["request_id"] # index hash keys are not nullable
103+
return item
104+
105+
106+
def _create_table(dynamodb, table_name):
107+
try:
108+
kwargs = {
109+
"AttributeDefinitions": [
110+
{"AttributeName": "uuid", "AttributeType": "S"},
111+
{"AttributeName": "request_id", "AttributeType": "S"},
112+
],
113+
"TableName": table_name,
114+
"KeySchema": [{"AttributeName": "uuid", "KeyType": "HASH"}],
115+
"GlobalSecondaryIndexes": [
116+
{
117+
"IndexName": "request-index",
118+
"KeySchema": [{"AttributeName": "request_id", "KeyType": "HASH"}],
119+
"Projection": {"ProjectionType": "ALL"},
120+
},
121+
],
122+
"BillingMode": "PAY_PER_REQUEST",
123+
}
124+
table = dynamodb.create_table(**kwargs)
125+
table.wait_until_exists()
126+
except dynamodb.meta.client.exceptions.ResourceInUseException:
127+
pass
128+
129+
130+
class DynamoDBMetricStore(MetricStore):
131+
def __init__(self, config=None):
132+
if config is None:
133+
config = {}
134+
135+
endpoint_url = config.get("endpoint_url")
136+
region = config.get("region")
137+
table_name = config.get("table_name", "metrics")
138+
139+
dynamodb = boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url)
140+
client = dynamodb.meta.client
141+
self.table = dynamodb.Table(table_name)
142+
143+
try:
144+
response = client.describe_table(TableName=table_name)
145+
if response["Table"]["TableStatus"] != "ACTIVE":
146+
raise RuntimeError(f"DynamoDB table {table_name} is not active.")
147+
except client.exceptions.ResourceNotFoundException:
148+
_create_table(dynamodb, table_name)
149+
150+
def get_type(self):
151+
return "dynamodb"
152+
153+
def add_metric(self, metric):
154+
try:
155+
self.table.put_item(Item=_dump(metric), ConditionExpression=Attr("uuid").not_exists())
156+
except botocore.exceptions.ClientError as e:
157+
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
158+
raise ValueError("Request already exists in request store") from e
159+
raise
160+
161+
def remove_metric(self, uuid):
162+
try:
163+
self.table.delete_item(Key={"uuid": str(uuid)}, ConditionExpression=Attr("uuid").exists())
164+
except botocore.exceptions.ClientError as e:
165+
if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
166+
raise KeyError("Request does not exist in request store") from e
167+
raise
168+
169+
def get_metric(self, uuid):
170+
response = self.table.get_item(Key={"uuid": str(uuid)})
171+
if "Item" in response:
172+
return _load(response["Item"])
173+
174+
def get_metrics(self, ascending=None, descending=None, limit=None, request_id=None, **kwargs):
175+
if ascending is not None and descending is not None:
176+
raise ValueError("Cannot sort by ascending and descending at the same time.")
177+
178+
if request_id is not None:
179+
fn = self.table.query
180+
params = {
181+
"IndexName": "request-index",
182+
"KeyConditionExpression": Key("request_id").eq(request_id),
183+
}
184+
else:
185+
fn = self.table.scan
186+
params = {}
187+
188+
if limit is not None:
189+
params["Limit"] = limit
190+
191+
if query := _make_query(**kwargs):
192+
params["FilterExpression"] = reduce(operator.__and__, (Attr(key).eq(value) for key, value in query.items()))
193+
194+
items = (_load(item) for item in _iter_items(fn, **params))
195+
if ascending is not None:
196+
return sorted(items, key=lambda item: getattr(item, ascending))
197+
if descending is not None:
198+
return sorted(items, key=lambda item: getattr(item, descending), reverse=True)
199+
return list(items)
200+
201+
def update_metric(self, metric):
202+
self.table.put_item(Item=_dump(metric))
203+
204+
def wipe(self):
205+
warnings.warn("wipe is not implemented for DynamoDBMetricStore")
206+
207+
def collect_metric_info(self):
208+
return {}

polytope_server/common/metric_store/metric_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def collect_metric_info(
6868
"""Collect dictionary of metrics"""
6969

7070

71-
type_to_class_map = {"mongodb": "MongoMetricStore"}
71+
type_to_class_map = {"mongodb": "MongoMetricStore", "dynamodb": "DynamoDBMetricStore"}
7272

7373

7474
def create_metric_store(metric_store_config=None):

polytope_server/common/metric_store/mongodb_metric_store.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939

4040
class MongoMetricStore(MetricStore):
4141
def __init__(self, config=None):
42+
if config is None:
43+
config = {}
44+
4245
uri = config.get("uri", "mongodb://localhost:27017")
4346
metric_collection = config.get("collection", "metrics")
4447

0 commit comments

Comments
 (0)