Skip to content

Commit 172bb43

Browse files
authored
Merge branch 'main' into feature/changing_grids_dt
2 parents 149afbd + f1cb313 commit 172bb43

File tree

12 files changed

+667
-80
lines changed

12 files changed

+667
-80
lines changed

polytope_server/common/metric.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ def __init__(self, **kwargs):
131131

132132
class RequestStatusChange(Metric):
133133

134-
__slots__ = ["host", "status", "request_id"]
134+
__slots__ = ["host", "status", "request_id", "user_id"]
135135

136136
def __init__(self, **kwargs):
137137
super().__init__(type=MetricType.REQUEST_STATUS_CHANGE, host=socket.gethostname(), **kwargs)

polytope_server/common/metric_store/mongodb_metric_store.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,22 @@ def add_metric(self, metric):
7878
raise ValueError("Metric already exists in metric store")
7979
self.store.insert_one(metric.serialize())
8080

81-
def remove_metric(self, uuid):
81+
def remove_metric(self, uuid, include_processed=False):
82+
"""
83+
Removes a metric with the given UUID. By default, it skips entries with status 'processed'.
84+
"""
85+
# Find the document
86+
metric = self.store.find_one({"uuid": uuid})
87+
if metric is None:
88+
raise KeyError("Metric does not exist in request store")
89+
90+
# Skip removal if the status is 'processed' and include_processed is False
91+
if metric["status"] == "processed" and not include_processed:
92+
# Log skipping for better traceability
93+
logging.info(f"Skipping removal of metric with UUID {uuid} as it has status 'processed'")
94+
return
95+
96+
# Delete the metric
8297
result = self.store.find_one_and_delete({"uuid": uuid})
8398
if result is None:
8499
raise KeyError("Metric does not exist in request store")

polytope_server/common/request_store/mongodb_request_store.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ def add_request(self, request):
6666
self.store.insert_one(request.serialize())
6767

6868
if self.metric_store:
69-
self.metric_store.add_metric(RequestStatusChange(request_id=request.id, status=request.status))
69+
self.metric_store.add_metric(
70+
RequestStatusChange(request_id=request.id, status=request.status, user_id=request.user.id)
71+
)
7072

7173
logging.info("Request ID {} status set to {}.".format(request.id, request.status))
7274

@@ -144,7 +146,9 @@ def update_request(self, request):
144146
)
145147

146148
if self.metric_store:
147-
self.metric_store.add_metric(RequestStatusChange(request_id=request.id, status=request.status))
149+
self.metric_store.add_metric(
150+
RequestStatusChange(request_id=request.id, status=request.status, user_id=request.user.id)
151+
)
148152

149153
logging.info("Request ID {} status set to {}.".format(request.id, request.status))
150154

@@ -154,7 +158,9 @@ def wipe(self):
154158
if self.metric_store:
155159
res = self.get_requests()
156160
for i in res:
157-
self.metric_store.remove_metric(type=MetricType.REQUEST_STATUS_CHANGE, request_id=i.id)
161+
self.metric_store.remove_metric(
162+
type=MetricType.REQUEST_STATUS_CHANGE, request_id=i.id, include_processed=True
163+
)
158164

159165
self.database.drop_collection(self.store.name)
160166

polytope_server/garbage-collector/garbage_collector.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import time
2424
from datetime import datetime, timedelta, timezone
2525

26+
from ..common.metric_store import create_metric_store
2627
from ..common.request import Status
2728
from ..common.request_store import create_request_store
2829
from ..common.staging import create_staging
@@ -36,9 +37,11 @@ def __init__(self, config):
3637
s_interval = gc_config.get("interval", "60s")
3738
s_threshold = gc_config.get("threshold", "10G")
3839
s_age = gc_config.get("age", "24h")
40+
s_metric_age = gc_config.get("metric_age", "24h")
3941
self.interval = parse_time(s_interval).total_seconds()
4042
self.threshold = parse_bytes(s_threshold)
4143
self.age = parse_time(s_age)
44+
self.metric_age = parse_time(s_metric_age)
4245

4346
logging.info(
4447
"Garbage collector initialized:\n Interval: {} ({} secs) \n \
@@ -52,12 +55,14 @@ def __init__(self, config):
5255
)
5356
)
5457

55-
self.request_store = create_request_store(config.get("request_store"))
58+
self.request_store = create_request_store(config.get("request_store"), config.get("metric_store"))
5659
self.staging = create_staging(config.get("staging"))
60+
self.metric_store = create_metric_store(config.get("metric_store"))
5761

5862
def run(self):
5963
while not time.sleep(self.interval):
6064
self.remove_old_requests()
65+
self.remove_old_metrics()
6166
self.remove_dangling_data()
6267
self.remove_by_size()
6368

@@ -80,6 +85,18 @@ def remove_old_requests(self):
8085
logging.info(f"Removing old request but data {data_name} not found in staging.")
8186
self.request_store.remove_request(r.id)
8287

88+
def remove_old_metrics(self):
89+
"""Removes metrics older than the configured time"""
90+
now = datetime.now(timezone.utc)
91+
cutoff = now - self.metric_age
92+
93+
metrics = self.metric_store.get_metrics()
94+
95+
for m in metrics:
96+
if datetime.fromtimestamp(m.timestamp, tz=timezone.utc) < cutoff:
97+
logging.info("Deleting metric {} because it is too old.".format(m.uuid))
98+
self.metric_store.remove_metric(m.uuid, include_processed=True)
99+
83100
def remove_dangling_data(self):
84101
"""As a failsafe, removes data which has no corresponding request."""
85102
all_objects = self.staging.list()

polytope_server/telemetry/dependencies.py

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,41 @@
1818
# does it submit to any jurisdiction.
1919
#
2020

21+
22+
import logging
23+
24+
from fastapi import HTTPException, Request, status
25+
26+
from ..common.auth import AuthHelper
27+
from ..common.authentication.plain_authentication import PlainAuthentication
28+
from ..common.exceptions import ForbiddenRequest
29+
from ..common.metric_store import create_metric_store
30+
from ..common.request_store import create_request_store
31+
from ..common.staging import create_staging
2132
from .config import config
33+
from .helpers import TelemetryLogSuppressor
34+
35+
logger = logging.getLogger(__name__)
36+
37+
# This is to avoid spamming the logs with the same auth message
38+
log_suppression_ttl = config.get("telemetry", {}).get("basic_auth", {}).get("log_suppression_ttl", 300)
39+
_telemetry_log_suppressor = TelemetryLogSuppressor(log_suppression_ttl)
40+
41+
plain_auth = PlainAuthentication(
42+
name="telemetry_basic_auth",
43+
realm="telemetry_realm",
44+
config={"users": config.get("telemetry", {}).get("basic_auth", {}).get("users", [])},
45+
)
46+
47+
48+
def initialize_resources(config):
49+
"""Initialize and return all resources."""
50+
return {
51+
"request_store": create_request_store(config.get("request_store"), config.get("metric_store")),
52+
"staging": create_staging(config.get("staging")),
53+
"metric_store": create_metric_store(config.get("metric_store")) if config.get("metric_store") else None,
54+
"auth": AuthHelper(config.config),
55+
}
2256

2357

2458
def get_settings():
@@ -31,26 +65,65 @@ def get_settings():
3165
}
3266

3367

34-
def get_request_store():
35-
from ..common.request_store import create_request_store
68+
def get_request_store(request: Request):
69+
return request.app.state.resources["request_store"]
70+
71+
72+
def get_staging(request: Request):
73+
return request.app.state.resources["staging"]
3674

37-
return create_request_store(config.get("request_store"), config.get("metric_store"))
3875

76+
def get_metric_store(request: Request):
77+
return request.app.state.resources["metric_store"]
3978

40-
def get_staging():
41-
from ..common.staging import create_staging
4279

43-
return create_staging(config.get("staging"))
80+
def get_auth(request: Request):
81+
return request.app.state.resources["auth"]
4482

4583

46-
def get_metric_store():
47-
from ..common.metric_store import create_metric_store
84+
def metrics_auth(request: Request):
85+
"""
86+
FastAPI dependency that:
87+
- Reads the 'Authorization' header.
88+
- If Basic Auth is disabled, returns immediately.
89+
- If it's enabled, calls 'plain_auth.authenticate'.
90+
- Translates 'ForbiddenRequest' -> FastAPI's HTTPException.
91+
"""
92+
basic_auth_cfg = config.get("telemetry", {}).get("basic_auth", {})
93+
if not basic_auth_cfg.get("enabled", False):
94+
# Basic Auth is disabled; skip credential checks
95+
return
4896

49-
metric_store_config = config.get("metric_store")
50-
return create_metric_store(metric_store_config) if metric_store_config else None
97+
auth_header = request.headers.get("Authorization")
98+
if not auth_header:
99+
logger.warning("Missing Authorization header for telemetry.")
100+
raise HTTPException(
101+
status_code=status.HTTP_401_UNAUTHORIZED,
102+
detail="Missing Basic Auth credentials",
103+
headers={"WWW-Authenticate": "Basic"},
104+
)
51105

106+
if not auth_header.startswith("Basic "):
107+
logger.warning("Invalid Auth scheme (expected Basic) for telemetry.")
108+
raise HTTPException(
109+
status_code=status.HTTP_401_UNAUTHORIZED,
110+
detail="Invalid Auth scheme",
111+
headers={"WWW-Authenticate": "Basic"},
112+
)
52113

53-
def get_auth():
54-
from ..common.auth import AuthHelper
114+
encoded_creds = auth_header[len("Basic ") :]
55115

56-
return AuthHelper(config.config)
116+
try:
117+
user = plain_auth.authenticate(encoded_creds)
118+
# If this succeeded, we have a valid user
119+
# Instead of logging directly every time, let the log suppressor decide.
120+
_telemetry_log_suppressor.log_if_needed(user.id)
121+
except ForbiddenRequest as e:
122+
# Ensure we never send an empty detail message
123+
detail_msg = str(e).strip() or "Invalid credentials"
124+
logger.warning(f"ForbiddenRequest: {detail_msg}")
125+
raise HTTPException(
126+
status_code=status.HTTP_401_UNAUTHORIZED,
127+
detail=detail_msg,
128+
headers={"WWW-Authenticate": "Basic"},
129+
)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#
2+
# Copyright 2022 European Centre for Medium-Range Weather Forecasts (ECMWF)
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
# In applying this licence, ECMWF does not waive the privileges and immunities
17+
# granted to it by virtue of its status as an intergovernmental organisation nor
18+
# does it submit to any jurisdiction.
19+
#
20+
21+
22+
class TelemetryError(Exception):
23+
"""Base class for telemetry-related errors."""
24+
25+
pass
26+
27+
28+
class TelemetryUsageDisabled(TelemetryError):
29+
"""Raised when telemetry usage is disabled."""
30+
31+
pass
32+
33+
34+
class RequestFetchError(TelemetryError):
35+
"""Raised when fetching requests fails."""
36+
37+
pass
38+
39+
40+
class MetricCalculationError(TelemetryError):
41+
"""Raised when metric calculation fails."""
42+
43+
pass
44+
45+
46+
class OutputFormatError(TelemetryError):
47+
"""Raised when an invalid output format is requested."""
48+
49+
pass
50+
51+
52+
class TelemetryConfigError(TelemetryError):
53+
"""Raised when there is an issue with the telemetry configuration."""
54+
55+
pass
56+
57+
58+
class TelemetryCacheError(TelemetryError):
59+
"""Raised when there is an issue with caching telemetry data."""
60+
61+
pass
62+
63+
64+
class TelemetryDataError(TelemetryError):
65+
"""Raised when there is an issue with telemetry data."""
66+
67+
pass

polytope_server/telemetry/fastapi_handler.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,7 @@
2424

2525

2626
class FastAPIHandler:
27-
28-
def create_handler(
29-
self,
30-
request_store,
31-
staging,
32-
auth,
33-
metric_store,
34-
):
35-
app = FastAPI(title="Polytope Telemetry Service")
27+
def create_handler(self, lifespan=None):
28+
app = FastAPI(title="Polytope Telemetry Service", lifespan=lifespan)
3629
app.include_router(router)
3730
return app

0 commit comments

Comments
 (0)