From 590c9bbb57881cb1d0506b0e17ec1303701e66fe Mon Sep 17 00:00:00 2001 From: sametd <samet.demir@ecmwf.int> Date: Mon, 13 Jan 2025 10:31:59 +0000 Subject: [PATCH] renamed some endpoints and added /telemetry/v1 as prefix --- polytope_server/telemetry/handlers.py | 121 +++++++++++++++----------- 1 file changed, 69 insertions(+), 52 deletions(-) diff --git a/polytope_server/telemetry/handlers.py b/polytope_server/telemetry/handlers.py index 5c06026..c17e34e 100644 --- a/polytope_server/telemetry/handlers.py +++ b/polytope_server/telemetry/handlers.py @@ -47,26 +47,36 @@ logger = logging.getLogger(__name__) -router = APIRouter() +# Prefix for all telemetry endpoints +router = APIRouter(prefix="/telemetry/v1") -@router.get("/telemetry/v1", summary="List available telemetry endpoints") +@router.get("/", summary="List API routes (optional)") async def list_endpoints(): - return ["test", "summary", "all", "requests", "workers"] + """ + Optional 'index' endpoint for enumerating possible sub-routes. + """ + return ["health", "status", "requests", "users/{user_id}/requests", "workers", "report", "metrics"] -@router.get("/telemetry/v1/test", summary="Check server status") -async def test(): +@router.get("/health", summary="Health check endpoint") +async def health(): + """ + Simple endpoint to verify server is up and running. + """ return {"message": "Polytope telemetry server is alive"} -@router.get("/telemetry/v1/summary", summary="Get service status") +@router.get("/status", summary="Get overall service status") async def service_status( request_store=Depends(get_request_store), staging=Depends(get_staging), auth=Depends(get_auth), metric_store=Depends(get_metric_store), ): + """ + Returns status or metrics for core services/stores. + """ return { "request_store": request_store.collect_metric_info(), "staging": staging.collect_metric_info(), @@ -75,13 +85,16 @@ async def service_status( } -@router.get("/telemetry/v1/requests", summary="Get all requests") +@router.get("/requests", summary="Retrieve requests") async def all_requests( status: Optional[StatusEnum] = Query(None, description="Filter requests by status"), id: Optional[str] = Query(None, description="Filter requests by ID"), request_store=Depends(get_request_store), metric_store=Depends(get_metric_store), ): + """ + Fetch a list of requests. Can filter by status and/or ID. + """ active_statuses = { StatusEnum.ACTIVE: [ StatusEnum.WAITING, @@ -91,25 +104,23 @@ async def all_requests( ] } - # Fetch requests based on status if status == StatusEnum.ACTIVE: statuses = active_statuses[status] elif status: statuses = [status] else: - # If no status is provided, fetch all requests without filtering by status statuses = [] + # Fetch requests from the store user_requests = [] if statuses: - # Fetch requests for each status in the list for status_filter in statuses: query = {"status": status_filter, "id": id} user_requests += request_store.get_requests(**query) else: - # Fetch all requests without status filter user_requests = request_store.get_requests(id=id) + # Serialize and possibly attach metrics response_message = [] for request in user_requests: serialized_request = request.serialize() @@ -118,11 +129,10 @@ async def all_requests( metrics = metric_store.get_metrics(type=MetricType.REQUEST_STATUS_CHANGE, request_id=id) serialized_request["trace"] = [metric.serialize() for metric in metrics] + # Obfuscate user details serialized_request["user"]["details"] = "**hidden**" - - # Check for attributes and API key - if config.get("telemetry", {}).get("obfuscate_apikeys", False) and "attributes" in serialized_request["user"]: - attributes = serialized_request["user"]["attributes"] + if config.get("telemetry", {}).get("obfuscate_apikeys", False): + attributes = serialized_request["user"].get("attributes", {}) if "ecmwf-apikey" in attributes: attributes["ecmwf-apikey"] = obfuscate_apikey(attributes["ecmwf-apikey"]) @@ -131,14 +141,17 @@ async def all_requests( return response_message -@router.get("/telemetry/v1/requests/user/{user_id}", summary="Get all requests for a user") +@router.get("/users/{user_id}/requests", summary="Get requests by user") async def user_requests( user_id: str, - status: Optional[StatusEnum] = Query(None, description="Filter requests by status"), - id: Optional[str] = Query(None, description="Filter requests by ID"), + status: Optional[StatusEnum] = Query(None, description="Filter by status"), + id: Optional[str] = Query(None, description="Filter by ID"), request_store=Depends(get_request_store), metric_store=Depends(get_metric_store), ): + """ + Get all requests for a given user, optionally filtered by status or ID. + """ active_statuses = { StatusEnum.ACTIVE: [ StatusEnum.WAITING, @@ -148,12 +161,9 @@ async def user_requests( ] } - # TODO: implement more robust user fetching - # Now we just fetch all requests and filter by user_id - # Fetch all requests for the user user_requests = request_store.get_requests(id=id) - # Apply status filtering + # Filter by status if provided if status == StatusEnum.ACTIVE: statuses = active_statuses[status] elif status: @@ -162,24 +172,26 @@ async def user_requests( statuses = [] if statuses: - user_requests = [request for request in user_requests if request.status in statuses] + user_requests = [r for r in user_requests if r.status in statuses] + # Filter by user ID filtered_requests = [] for request in user_requests: if request.serialize()["user"]["id"] == user_id: filtered_requests.append(request) - # Serialize and enrich with metrics + # Serialize and attach metrics response_message = [] for request in filtered_requests: serialized_request = request.serialize() + metrics = metric_store.get_metrics(type=MetricType.REQUEST_STATUS_CHANGE, request_id=user_id) - serialized_request["metrics"] = [metric.serialize() for metric in metrics] - serialized_request["user"]["details"] = "**hidden**" + serialized_request["metrics"] = [m.serialize() for m in metrics] - # Check for attributes and API key - if config.get("telemetry", {}).get("obfuscate_apikeys", False) and "attributes" in serialized_request["user"]: - attributes: Dict[str, Any] = serialized_request["user"]["attributes"] + # Hide sensitive info + serialized_request["user"]["details"] = "**hidden**" + if config.get("telemetry", {}).get("obfuscate_apikeys", False): + attributes: Dict[str, Any] = serialized_request["user"].get("attributes", {}) if "ecmwf-apikey" in attributes: attributes["ecmwf-apikey"] = obfuscate_apikey(attributes["ecmwf-apikey"]) @@ -188,36 +200,44 @@ async def user_requests( return response_message -@router.get("/telemetry/v1/workers", summary="Get active workers") +@router.get("/workers", summary="Get workers information") async def active_workers( - uuid: Optional[str] = Query(None, description="Filter workers by UUID"), - host: Optional[str] = Query(None, description="Filter workers by host"), + uuid: Optional[str] = Query(None, description="Filter by worker UUID"), + host: Optional[str] = Query(None, description="Filter by host name"), metric_store=Depends(get_metric_store), ): + """ + Retrieve info about active workers from the metric store. + """ if not metric_store: raise HTTPException(status_code=500, detail="Metric store is unavailable.") query = {"uuid": uuid, "host": host, "type": MetricType.WORKER_INFO} worker_statuses = metric_store.get_metrics(**query) - response_message = [] + response = [] for worker in worker_statuses: serialized_worker = worker.serialize(ndigits=2) + # Show original timestamp serialized_worker["timestamp_served"] = worker.timestamp - response_message.append(serialized_worker) + response.append(serialized_worker) - return response_message + return response -@router.get("/telemetry/v1/all", summary="Get all metrics and information") -async def all_metrics( +@router.get("/report", summary="Get a full overview") +async def full_telemetry_report( request_store=Depends(get_request_store), staging=Depends(get_staging), auth=Depends(get_auth), metric_store=Depends(get_metric_store), ): + """ + Retrieves an aggregated 'big picture': service status, + active requests, active workers, etc. + """ # Service status - service_status = { + service_status_data = { "request_store": request_store.collect_metric_info(), "staging": staging.collect_metric_info(), "auth": auth.collect_metric_info(), @@ -232,8 +252,8 @@ async def all_metrics( StatusEnum.QUEUED, StatusEnum.PROCESSING, ] - for status in active_statuses: - query = {"status": status} + for st in active_statuses: + query = {"status": st} active_requests += request_store.get_requests(**query) # Active workers @@ -241,31 +261,27 @@ async def all_metrics( if metric_store: worker_statuses = metric_store.get_metrics(type=MetricType.WORKER_INFO) - # Combine all information - response_message = { - "service_status": service_status, - "active_requests": [request.serialize() for request in active_requests], - "active_workers": [worker.serialize(ndigits=2) for worker in worker_statuses], + # Combine + return { + "service_status": service_status_data, + "active_requests": [r.serialize() for r in active_requests], + "active_workers": [w.serialize(ndigits=2) for w in worker_statuses], } - return response_message - -@router.get("/telemetry/v1/usage", summary="Get usage metrics") +@router.get("/metrics", summary="Retrieve usage metrics") async def usage_metrics( format: str = Query("prometheus", description="Output format: prometheus or json"), metric_store=Depends(get_metric_store), ): """ - Endpoint to expose usage metrics in Prometheus or JSON format. + Endpoint exposing usage metrics in various formats. """ try: - # Ensure telemetry usage is enabled if not is_usage_enabled(): raise TelemetryUsageDisabled("Telemetry usage is disabled") now = datetime.now(timezone.utc) - # Intentionally using seconds here as this cache should be short-lived cache_expiry_seconds = config.get("telemetry", {}).get("usage", {}).get("cache_expiry_seconds", 30) # Fetch user requests @@ -279,7 +295,8 @@ async def usage_metrics( # Calculate metrics metrics = calculate_usage_metrics(user_requests, time_frames, now) - # Format and return output + + # Format output return format_output(metrics, time_frames, format) except TelemetryUsageDisabled as e: