Skip to content

Commit

Permalink
Merge pull request #58 from ecmwf/feature/telemetry-usage-endpoint
Browse files Browse the repository at this point in the history
Feature/telemetry usage endpoint
  • Loading branch information
sametd authored Jan 15, 2025
2 parents 5661d52 + 9041537 commit 0c53653
Show file tree
Hide file tree
Showing 11 changed files with 666 additions and 79 deletions.
2 changes: 1 addition & 1 deletion polytope_server/common/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(self, **kwargs):

class RequestStatusChange(Metric):

__slots__ = ["host", "status", "request_id"]
__slots__ = ["host", "status", "request_id", "user_id"]

def __init__(self, **kwargs):
super().__init__(type=MetricType.REQUEST_STATUS_CHANGE, host=socket.gethostname(), **kwargs)
Expand Down
17 changes: 16 additions & 1 deletion polytope_server/common/metric_store/mongodb_metric_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,22 @@ def add_metric(self, metric):
raise ValueError("Metric already exists in metric store")
self.store.insert_one(metric.serialize())

def remove_metric(self, uuid):
def remove_metric(self, uuid, include_processed=False):
"""
Removes a metric with the given UUID. By default, it skips entries with status 'processed'.
"""
# Find the document
metric = self.store.find_one({"uuid": uuid})
if metric is None:
raise KeyError("Metric does not exist in request store")

# Skip removal if the status is 'processed' and include_processed is False
if metric["status"] == "processed" and not include_processed:
# Log skipping for better traceability
logging.info(f"Skipping removal of metric with UUID {uuid} as it has status 'processed'")
return

# Delete the metric
result = self.store.find_one_and_delete({"uuid": uuid})
if result is None:
raise KeyError("Metric does not exist in request store")
Expand Down
12 changes: 9 additions & 3 deletions polytope_server/common/request_store/mongodb_request_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def add_request(self, request):
self.store.insert_one(request.serialize())

if self.metric_store:
self.metric_store.add_metric(RequestStatusChange(request_id=request.id, status=request.status))
self.metric_store.add_metric(
RequestStatusChange(request_id=request.id, status=request.status, user_id=request.user.id)
)

logging.info("Request ID {} status set to {}.".format(request.id, request.status))

Expand Down Expand Up @@ -144,7 +146,9 @@ def update_request(self, request):
)

if self.metric_store:
self.metric_store.add_metric(RequestStatusChange(request_id=request.id, status=request.status))
self.metric_store.add_metric(
RequestStatusChange(request_id=request.id, status=request.status, user_id=request.user.id)
)

logging.info("Request ID {} status set to {}.".format(request.id, request.status))

Expand All @@ -154,7 +158,9 @@ def wipe(self):
if self.metric_store:
res = self.get_requests()
for i in res:
self.metric_store.remove_metric(type=MetricType.REQUEST_STATUS_CHANGE, request_id=i.id)
self.metric_store.remove_metric(
type=MetricType.REQUEST_STATUS_CHANGE, request_id=i.id, include_processed=True
)

self.database.drop_collection(self.store.name)

Expand Down
19 changes: 18 additions & 1 deletion polytope_server/garbage-collector/garbage_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import time
from datetime import datetime, timedelta, timezone

from ..common.metric_store import create_metric_store
from ..common.request import Status
from ..common.request_store import create_request_store
from ..common.staging import create_staging
Expand All @@ -36,9 +37,11 @@ def __init__(self, config):
s_interval = gc_config.get("interval", "60s")
s_threshold = gc_config.get("threshold", "10G")
s_age = gc_config.get("age", "24h")
s_metric_age = gc_config.get("metric_age", "24h")
self.interval = parse_time(s_interval).total_seconds()
self.threshold = parse_bytes(s_threshold)
self.age = parse_time(s_age)
self.metric_age = parse_time(s_metric_age)

logging.info(
"Garbage collector initialized:\n Interval: {} ({} secs) \n \
Expand All @@ -52,12 +55,14 @@ def __init__(self, config):
)
)

self.request_store = create_request_store(config.get("request_store"))
self.request_store = create_request_store(config.get("request_store"), config.get("metric_store"))
self.staging = create_staging(config.get("staging"))
self.metric_store = create_metric_store(config.get("metric_store"))

def run(self):
while not time.sleep(self.interval):
self.remove_old_requests()
self.remove_old_metrics()
self.remove_dangling_data()
self.remove_by_size()

Expand All @@ -80,6 +85,18 @@ def remove_old_requests(self):
logging.info(f"Removing old request but data {data_name} not found in staging.")
self.request_store.remove_request(r.id)

def remove_old_metrics(self):
"""Removes metrics older than the configured time"""
now = datetime.now(timezone.utc)
cutoff = now - self.metric_age

metrics = self.metric_store.get_metrics()

for m in metrics:
if datetime.fromtimestamp(m.timestamp, tz=timezone.utc) < cutoff:
logging.info("Deleting metric {} because it is too old.".format(m.uuid))
self.metric_store.remove_metric(m.uuid, include_processed=True)

def remove_dangling_data(self):
"""As a failsafe, removes data which has no corresponding request."""
all_objects = self.staging.list()
Expand Down
99 changes: 86 additions & 13 deletions polytope_server/telemetry/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,41 @@
# does it submit to any jurisdiction.
#


import logging

from fastapi import HTTPException, Request, status

from ..common.auth import AuthHelper
from ..common.authentication.plain_authentication import PlainAuthentication
from ..common.exceptions import ForbiddenRequest
from ..common.metric_store import create_metric_store
from ..common.request_store import create_request_store
from ..common.staging import create_staging
from .config import config
from .helpers import TelemetryLogSuppressor

logger = logging.getLogger(__name__)

# This is to avoid spamming the logs with the same auth message
log_suppression_ttl = config.get("telemetry", {}).get("basic_auth", {}).get("log_suppression_ttl", 300)
_telemetry_log_suppressor = TelemetryLogSuppressor(log_suppression_ttl)

plain_auth = PlainAuthentication(
name="telemetry_basic_auth",
realm="telemetry_realm",
config={"users": config.get("telemetry", {}).get("basic_auth", {}).get("users", [])},
)


def initialize_resources(config):
"""Initialize and return all resources."""
return {
"request_store": create_request_store(config.get("request_store"), config.get("metric_store")),
"staging": create_staging(config.get("staging")),
"metric_store": create_metric_store(config.get("metric_store")) if config.get("metric_store") else None,
"auth": AuthHelper(config.config),
}


def get_settings():
Expand All @@ -31,26 +65,65 @@ def get_settings():
}


def get_request_store():
from ..common.request_store import create_request_store
def get_request_store(request: Request):
return request.app.state.resources["request_store"]


def get_staging(request: Request):
return request.app.state.resources["staging"]

return create_request_store(config.get("request_store"), config.get("metric_store"))

def get_metric_store(request: Request):
return request.app.state.resources["metric_store"]

def get_staging():
from ..common.staging import create_staging

return create_staging(config.get("staging"))
def get_auth(request: Request):
return request.app.state.resources["auth"]


def get_metric_store():
from ..common.metric_store import create_metric_store
def metrics_auth(request: Request):
"""
FastAPI dependency that:
- Reads the 'Authorization' header.
- If Basic Auth is disabled, returns immediately.
- If it's enabled, calls 'plain_auth.authenticate'.
- Translates 'ForbiddenRequest' -> FastAPI's HTTPException.
"""
basic_auth_cfg = config.get("telemetry", {}).get("basic_auth", {})
if not basic_auth_cfg.get("enabled", False):
# Basic Auth is disabled; skip credential checks
return

metric_store_config = config.get("metric_store")
return create_metric_store(metric_store_config) if metric_store_config else None
auth_header = request.headers.get("Authorization")
if not auth_header:
logger.warning("Missing Authorization header for telemetry.")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing Basic Auth credentials",
headers={"WWW-Authenticate": "Basic"},
)

if not auth_header.startswith("Basic "):
logger.warning("Invalid Auth scheme (expected Basic) for telemetry.")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Auth scheme",
headers={"WWW-Authenticate": "Basic"},
)

def get_auth():
from ..common.auth import AuthHelper
encoded_creds = auth_header[len("Basic ") :]

return AuthHelper(config.config)
try:
user = plain_auth.authenticate(encoded_creds)
# If this succeeded, we have a valid user
# Instead of logging directly every time, let the log suppressor decide.
_telemetry_log_suppressor.log_if_needed(user.id)
except ForbiddenRequest as e:
# Ensure we never send an empty detail message
detail_msg = str(e).strip() or "Invalid credentials"
logger.warning(f"ForbiddenRequest: {detail_msg}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail_msg,
headers={"WWW-Authenticate": "Basic"},
)
67 changes: 67 additions & 0 deletions polytope_server/telemetry/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#
# Copyright 2022 European Centre for Medium-Range Weather Forecasts (ECMWF)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation nor
# does it submit to any jurisdiction.
#


class TelemetryError(Exception):
"""Base class for telemetry-related errors."""

pass


class TelemetryUsageDisabled(TelemetryError):
"""Raised when telemetry usage is disabled."""

pass


class RequestFetchError(TelemetryError):
"""Raised when fetching requests fails."""

pass


class MetricCalculationError(TelemetryError):
"""Raised when metric calculation fails."""

pass


class OutputFormatError(TelemetryError):
"""Raised when an invalid output format is requested."""

pass


class TelemetryConfigError(TelemetryError):
"""Raised when there is an issue with the telemetry configuration."""

pass


class TelemetryCacheError(TelemetryError):
"""Raised when there is an issue with caching telemetry data."""

pass


class TelemetryDataError(TelemetryError):
"""Raised when there is an issue with telemetry data."""

pass
11 changes: 2 additions & 9 deletions polytope_server/telemetry/fastapi_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,7 @@


class FastAPIHandler:

def create_handler(
self,
request_store,
staging,
auth,
metric_store,
):
app = FastAPI(title="Polytope Telemetry Service")
def create_handler(self, lifespan=None):
app = FastAPI(title="Polytope Telemetry Service", lifespan=lifespan)
app.include_router(router)
return app
Loading

0 comments on commit 0c53653

Please sign in to comment.