Skip to content

Commit

Permalink
renamed some endpoints and added /telemetry/v1 as prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
sametd committed Jan 13, 2025
1 parent 753ec56 commit 590c9bb
Showing 1 changed file with 69 additions and 52 deletions.
121 changes: 69 additions & 52 deletions polytope_server/telemetry/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,36 @@

logger = logging.getLogger(__name__)

router = APIRouter()
# Prefix for all telemetry endpoints
router = APIRouter(prefix="/telemetry/v1")


@router.get("/telemetry/v1", summary="List available telemetry endpoints")
@router.get("/", summary="List API routes (optional)")
async def list_endpoints():
return ["test", "summary", "all", "requests", "workers"]
"""
Optional 'index' endpoint for enumerating possible sub-routes.
"""
return ["health", "status", "requests", "users/{user_id}/requests", "workers", "report", "metrics"]


@router.get("/telemetry/v1/test", summary="Check server status")
async def test():
@router.get("/health", summary="Health check endpoint")
async def health():
"""
Simple endpoint to verify server is up and running.
"""
return {"message": "Polytope telemetry server is alive"}


@router.get("/telemetry/v1/summary", summary="Get service status")
@router.get("/status", summary="Get overall service status")
async def service_status(
request_store=Depends(get_request_store),
staging=Depends(get_staging),
auth=Depends(get_auth),
metric_store=Depends(get_metric_store),
):
"""
Returns status or metrics for core services/stores.
"""
return {
"request_store": request_store.collect_metric_info(),
"staging": staging.collect_metric_info(),
Expand All @@ -75,13 +85,16 @@ async def service_status(
}


@router.get("/telemetry/v1/requests", summary="Get all requests")
@router.get("/requests", summary="Retrieve requests")
async def all_requests(
status: Optional[StatusEnum] = Query(None, description="Filter requests by status"),
id: Optional[str] = Query(None, description="Filter requests by ID"),
request_store=Depends(get_request_store),
metric_store=Depends(get_metric_store),
):
"""
Fetch a list of requests. Can filter by status and/or ID.
"""
active_statuses = {
StatusEnum.ACTIVE: [
StatusEnum.WAITING,
Expand All @@ -91,25 +104,23 @@ async def all_requests(
]
}

# Fetch requests based on status
if status == StatusEnum.ACTIVE:
statuses = active_statuses[status]
elif status:
statuses = [status]
else:
# If no status is provided, fetch all requests without filtering by status
statuses = []

# Fetch requests from the store
user_requests = []
if statuses:
# Fetch requests for each status in the list
for status_filter in statuses:
query = {"status": status_filter, "id": id}
user_requests += request_store.get_requests(**query)
else:
# Fetch all requests without status filter
user_requests = request_store.get_requests(id=id)

# Serialize and possibly attach metrics
response_message = []
for request in user_requests:
serialized_request = request.serialize()
Expand All @@ -118,11 +129,10 @@ async def all_requests(
metrics = metric_store.get_metrics(type=MetricType.REQUEST_STATUS_CHANGE, request_id=id)
serialized_request["trace"] = [metric.serialize() for metric in metrics]

# Obfuscate user details
serialized_request["user"]["details"] = "**hidden**"

# Check for attributes and API key
if config.get("telemetry", {}).get("obfuscate_apikeys", False) and "attributes" in serialized_request["user"]:
attributes = serialized_request["user"]["attributes"]
if config.get("telemetry", {}).get("obfuscate_apikeys", False):
attributes = serialized_request["user"].get("attributes", {})
if "ecmwf-apikey" in attributes:
attributes["ecmwf-apikey"] = obfuscate_apikey(attributes["ecmwf-apikey"])

Expand All @@ -131,14 +141,17 @@ async def all_requests(
return response_message


@router.get("/telemetry/v1/requests/user/{user_id}", summary="Get all requests for a user")
@router.get("/users/{user_id}/requests", summary="Get requests by user")
async def user_requests(
user_id: str,
status: Optional[StatusEnum] = Query(None, description="Filter requests by status"),
id: Optional[str] = Query(None, description="Filter requests by ID"),
status: Optional[StatusEnum] = Query(None, description="Filter by status"),
id: Optional[str] = Query(None, description="Filter by ID"),
request_store=Depends(get_request_store),
metric_store=Depends(get_metric_store),
):
"""
Get all requests for a given user, optionally filtered by status or ID.
"""
active_statuses = {
StatusEnum.ACTIVE: [
StatusEnum.WAITING,
Expand All @@ -148,12 +161,9 @@ async def user_requests(
]
}

# TODO: implement more robust user fetching
# Now we just fetch all requests and filter by user_id
# Fetch all requests for the user
user_requests = request_store.get_requests(id=id)

# Apply status filtering
# Filter by status if provided
if status == StatusEnum.ACTIVE:
statuses = active_statuses[status]
elif status:
Expand All @@ -162,24 +172,26 @@ async def user_requests(
statuses = []

if statuses:
user_requests = [request for request in user_requests if request.status in statuses]
user_requests = [r for r in user_requests if r.status in statuses]

# Filter by user ID
filtered_requests = []
for request in user_requests:
if request.serialize()["user"]["id"] == user_id:
filtered_requests.append(request)

# Serialize and enrich with metrics
# Serialize and attach metrics
response_message = []
for request in filtered_requests:
serialized_request = request.serialize()

metrics = metric_store.get_metrics(type=MetricType.REQUEST_STATUS_CHANGE, request_id=user_id)
serialized_request["metrics"] = [metric.serialize() for metric in metrics]
serialized_request["user"]["details"] = "**hidden**"
serialized_request["metrics"] = [m.serialize() for m in metrics]

# Check for attributes and API key
if config.get("telemetry", {}).get("obfuscate_apikeys", False) and "attributes" in serialized_request["user"]:
attributes: Dict[str, Any] = serialized_request["user"]["attributes"]
# Hide sensitive info
serialized_request["user"]["details"] = "**hidden**"
if config.get("telemetry", {}).get("obfuscate_apikeys", False):
attributes: Dict[str, Any] = serialized_request["user"].get("attributes", {})
if "ecmwf-apikey" in attributes:
attributes["ecmwf-apikey"] = obfuscate_apikey(attributes["ecmwf-apikey"])

Expand All @@ -188,36 +200,44 @@ async def user_requests(
return response_message


@router.get("/telemetry/v1/workers", summary="Get active workers")
@router.get("/workers", summary="Get workers information")
async def active_workers(
uuid: Optional[str] = Query(None, description="Filter workers by UUID"),
host: Optional[str] = Query(None, description="Filter workers by host"),
uuid: Optional[str] = Query(None, description="Filter by worker UUID"),
host: Optional[str] = Query(None, description="Filter by host name"),
metric_store=Depends(get_metric_store),
):
"""
Retrieve info about active workers from the metric store.
"""
if not metric_store:
raise HTTPException(status_code=500, detail="Metric store is unavailable.")

query = {"uuid": uuid, "host": host, "type": MetricType.WORKER_INFO}
worker_statuses = metric_store.get_metrics(**query)

response_message = []
response = []
for worker in worker_statuses:
serialized_worker = worker.serialize(ndigits=2)
# Show original timestamp
serialized_worker["timestamp_served"] = worker.timestamp
response_message.append(serialized_worker)
response.append(serialized_worker)

return response_message
return response


@router.get("/telemetry/v1/all", summary="Get all metrics and information")
async def all_metrics(
@router.get("/report", summary="Get a full overview")
async def full_telemetry_report(
request_store=Depends(get_request_store),
staging=Depends(get_staging),
auth=Depends(get_auth),
metric_store=Depends(get_metric_store),
):
"""
Retrieves an aggregated 'big picture': service status,
active requests, active workers, etc.
"""
# Service status
service_status = {
service_status_data = {
"request_store": request_store.collect_metric_info(),
"staging": staging.collect_metric_info(),
"auth": auth.collect_metric_info(),
Expand All @@ -232,40 +252,36 @@ async def all_metrics(
StatusEnum.QUEUED,
StatusEnum.PROCESSING,
]
for status in active_statuses:
query = {"status": status}
for st in active_statuses:
query = {"status": st}
active_requests += request_store.get_requests(**query)

# Active workers
worker_statuses = []
if metric_store:
worker_statuses = metric_store.get_metrics(type=MetricType.WORKER_INFO)

# Combine all information
response_message = {
"service_status": service_status,
"active_requests": [request.serialize() for request in active_requests],
"active_workers": [worker.serialize(ndigits=2) for worker in worker_statuses],
# Combine
return {
"service_status": service_status_data,
"active_requests": [r.serialize() for r in active_requests],
"active_workers": [w.serialize(ndigits=2) for w in worker_statuses],
}

return response_message


@router.get("/telemetry/v1/usage", summary="Get usage metrics")
@router.get("/metrics", summary="Retrieve usage metrics")
async def usage_metrics(
format: str = Query("prometheus", description="Output format: prometheus or json"),
metric_store=Depends(get_metric_store),
):
"""
Endpoint to expose usage metrics in Prometheus or JSON format.
Endpoint exposing usage metrics in various formats.
"""
try:
# Ensure telemetry usage is enabled
if not is_usage_enabled():
raise TelemetryUsageDisabled("Telemetry usage is disabled")

now = datetime.now(timezone.utc)
# Intentionally using seconds here as this cache should be short-lived
cache_expiry_seconds = config.get("telemetry", {}).get("usage", {}).get("cache_expiry_seconds", 30)

# Fetch user requests
Expand All @@ -279,7 +295,8 @@ async def usage_metrics(

# Calculate metrics
metrics = calculate_usage_metrics(user_requests, time_frames, now)
# Format and return output

# Format output
return format_output(metrics, time_frames, format)

except TelemetryUsageDisabled as e:
Expand Down

0 comments on commit 590c9bb

Please sign in to comment.