1
- import json
1
+ import logging
2
2
from datetime import datetime , timedelta , timezone
3
- from typing import Any , Callable , Dict , List , Optional
3
+ from typing import Any , Dict , List , Optional
4
4
5
5
from fastapi import Response
6
- from prometheus_client import CollectorRegistry , Gauge
6
+ from fastapi .responses import JSONResponse
7
+ from prometheus_client import (
8
+ CONTENT_TYPE_LATEST ,
9
+ CollectorRegistry ,
10
+ Gauge ,
11
+ generate_latest ,
12
+ )
7
13
8
14
from .config import config
9
15
from .enums import StatusEnum
16
+ from .exceptions import (
17
+ MetricCalculationError ,
18
+ OutputFormatError ,
19
+ RequestFetchError ,
20
+ TelemetryConfigError ,
21
+ TelemetryDataError ,
22
+ )
23
+
24
+ logger = logging .getLogger (__name__ )
10
25
11
26
# Global cache dictionary for usage metrics
12
27
usage_metrics_cache = {"data" : None , "timestamp" : None }
13
28
14
29
15
- def is_usage_enabled ():
16
- """Check if telemetry usage endpoint is enabled in the config."""
17
- return config .get ("telemetry" , {}).get ("usage" , {}).get ("enabled" , False )
30
+ def obfuscate_apikey (key : str ) -> str :
31
+ """Obfuscate the API key, keeping only the first and last 4 characters visible."""
32
+ if len (key ) > 8 :
33
+ return f"{ key [:4 ]} { '*' * (len (key ) - 8 )} { key [- 4 :]} "
34
+ return "*" * len (key )
18
35
19
36
20
- def get_usage_timeframes_from_config ():
37
+ def is_usage_enabled ():
21
38
"""
22
- Load timeframes from YAML config and auto-calculate metric details.
39
+ Check if telemetry usage endpoint is enabled in the config.
40
+ Handles errors internally and raises an appropriate exception if there's an issue.
23
41
"""
24
- raw_timeframes = config .get ("telemetry" , {}).get ("usage" , {}).get ("timeframes" , [1 , 2 , 7 , 30 ])
25
- timeframes = []
26
-
27
- for days in raw_timeframes :
28
- frame = {
29
- "name" : f"last_{ days } d" ,
30
- "delta" : timedelta (days = days ),
31
- "request_metric_name" : f"polytope_requests_last_{ days } d" ,
32
- "user_metric_name" : f"polytope_unique_users_last_{ days } d" ,
33
- "request_metric_description" : f"Number of requests in the last { days } days" ,
34
- "user_metric_description" : f"Number of unique users in the last { days } days" ,
35
- }
36
- timeframes .append (frame )
42
+ try :
43
+ enabled = config .get ("telemetry" , {}).get ("usage" , {}).get ("enabled" , False )
44
+ if not isinstance (enabled , bool ):
45
+ raise TelemetryConfigError ("The 'enabled' field in the telemetry config must be a boolean" )
46
+ return enabled
47
+ except Exception as e :
48
+ logger .error (f"Error checking telemetry usage status: { e } " )
49
+ raise TelemetryConfigError ("An error occurred while reading the telemetry configuration" )
37
50
38
- return timeframes
51
+
52
+ def get_usage_timeframes_from_config () -> List [Dict [str , Any ]]:
53
+ """
54
+ Load timeframes from the telemetry configuration and generate metric details.
55
+ """
56
+ try :
57
+ raw_timeframes = config .get ("telemetry" , {}).get ("usage" , {}).get ("timeframes" , [])
58
+ if not raw_timeframes :
59
+ raise TelemetryConfigError ("No timeframes defined in telemetry configuration" )
60
+
61
+ timeframes = []
62
+ for days in raw_timeframes :
63
+ if not isinstance (days , int ) or days <= 0 :
64
+ raise TelemetryConfigError (f"Invalid timeframe value: { days } " )
65
+ timeframes .append (
66
+ {
67
+ "name" : f"last_{ days } d" ,
68
+ "delta" : timedelta (days = days ),
69
+ "request_metric_name" : f"polytope_requests_last_{ days } d" ,
70
+ "user_metric_name" : f"polytope_unique_users_last_{ days } d" ,
71
+ "request_metric_description" : f"Number of requests in the last { days } days" ,
72
+ "user_metric_description" : f"Number of unique users in the last { days } days" ,
73
+ }
74
+ )
75
+ return timeframes
76
+ except Exception as e :
77
+ logger .error (f"Error loading timeframes from config: { e } " )
78
+ raise TelemetryConfigError ("An error occurred while reading telemetry timeframes from the config" )
39
79
40
80
41
81
async def get_cached_usage_user_requests (
42
82
status : Optional [StatusEnum ],
43
83
id : Optional [str ],
44
84
request_store ,
45
85
metric_store ,
46
- fetch_function : Callable , # Callable function
47
- cache_expiry_seconds : int = 30 ,
86
+ fetch_function ,
87
+ cache_expiry_seconds : int ,
48
88
) -> List [Dict [str , Any ]]:
49
89
"""
50
- Fetches user requests from the all_requests function and caches the result .
90
+ Fetches user requests from the cache or calls the fetch_function if cache is expired .
51
91
"""
52
- now = datetime .now (timezone .utc )
53
- cache_expiry = timedelta (seconds = cache_expiry_seconds )
54
-
55
- if usage_metrics_cache ["data" ] and usage_metrics_cache ["timestamp" ]:
56
- if now - usage_metrics_cache ["timestamp" ] < cache_expiry :
57
- return usage_metrics_cache ["data" ]
58
-
59
- user_requests = await fetch_function (
60
- status = status ,
61
- id = id ,
62
- request_store = request_store ,
63
- metric_store = metric_store ,
64
- )
65
-
66
- if isinstance (user_requests , Response ):
67
- user_requests = json .loads (user_requests .body .decode ("utf-8" ))
68
- elif not isinstance (user_requests , list ):
69
- raise Exception ("Unexpected data format from all_requests" )
70
-
71
- usage_metrics_cache ["data" ] = user_requests
72
- usage_metrics_cache ["timestamp" ] = now
73
- return user_requests
92
+ try :
93
+ now = datetime .now (timezone .utc )
94
+ cache_expiry = timedelta (seconds = cache_expiry_seconds )
95
+
96
+ if usage_metrics_cache ["data" ] and usage_metrics_cache ["timestamp" ]:
97
+ if now - usage_metrics_cache ["timestamp" ] < cache_expiry :
98
+ return usage_metrics_cache ["data" ]
99
+
100
+ # Fetch fresh data if cache is expired
101
+ user_requests = await fetch_function (
102
+ status = status ,
103
+ id = id ,
104
+ request_store = request_store ,
105
+ metric_store = metric_store ,
106
+ )
107
+
108
+ if not isinstance (user_requests , list ):
109
+ raise TelemetryDataError ("Fetched data is not in the expected list format" )
110
+
111
+ # Update the cache
112
+ usage_metrics_cache ["data" ] = user_requests
113
+ usage_metrics_cache ["timestamp" ] = now
114
+ return user_requests
115
+ except Exception as e :
116
+ logger .error (f"Unexpected error while fetching cached user requests: { e } " )
117
+ raise RequestFetchError ("Failed to retrieve or cache user requests" )
74
118
75
119
76
120
def calculate_usage_metrics (
77
121
user_requests : List [Dict [str , Any ]], time_frames : List [Dict [str , Any ]], now : datetime
78
122
) -> Dict [str , Any ]:
79
123
"""
80
- Calculates metrics over specified time frames.
124
+ Calculates usage metrics over specified time frames.
81
125
"""
82
- metrics = {"total_requests" : len (user_requests ), "unique_users" : set (), "time_frame_metrics" : {}}
83
-
84
- for request_data in user_requests :
85
- user_id = request_data .get ("user" , {}).get ("id" )
86
- if user_id :
87
- metrics ["unique_users" ].add (user_id )
88
-
89
- for frame in time_frames :
90
- frame_name = frame ["name" ]
91
- frame_threshold = now - frame ["delta" ]
92
- metrics ["time_frame_metrics" ][frame_name ] = {"requests" : 0 , "unique_users" : set ()}
126
+ try :
127
+ metrics = {"total_requests" : len (user_requests ), "unique_users" : set (), "time_frame_metrics" : {}}
93
128
94
- for request_data in user_requests :
95
- request_timestamp = datetime .fromtimestamp (request_data ["timestamp" ], tz = timezone .utc )
96
- user_id = request_data .get ("user" , {}).get ("id" )
129
+ # Collect unique users and calculate time frame metrics
130
+ for request_data in user_requests :
131
+ user_id = request_data .get ("user" , {}).get ("id" )
132
+ if user_id :
133
+ metrics ["unique_users" ].add (user_id )
97
134
98
135
for frame in time_frames :
99
136
frame_name = frame ["name" ]
100
137
frame_threshold = now - frame ["delta" ]
101
- if request_timestamp >= frame_threshold :
102
- metrics ["time_frame_metrics" ][frame_name ]["requests" ] += 1
103
- if user_id :
104
- metrics ["time_frame_metrics" ][frame_name ]["unique_users" ].add (user_id )
138
+ metrics ["time_frame_metrics" ][frame_name ] = {"requests" : 0 , "unique_users" : set ()}
139
+
140
+ for request_data in user_requests :
141
+ request_timestamp = datetime .fromtimestamp (request_data ["timestamp" ], tz = timezone .utc )
142
+ user_id = request_data .get ("user" , {}).get ("id" )
143
+
144
+ for frame in time_frames :
145
+ frame_name = frame ["name" ]
146
+ frame_threshold = now - frame ["delta" ]
147
+ if request_timestamp >= frame_threshold :
148
+ metrics ["time_frame_metrics" ][frame_name ]["requests" ] += 1
149
+ if user_id :
150
+ metrics ["time_frame_metrics" ][frame_name ]["unique_users" ].add (user_id )
105
151
106
- return metrics
152
+ return metrics
153
+ except Exception as e :
154
+ logger .error (f"Error calculating usage metrics: { e } " )
155
+ raise MetricCalculationError ("An error occurred while calculating usage metrics" )
107
156
108
157
109
158
def prepare_usage_json_metrics (metrics : Dict [str , Any ], time_frames : List [Dict [str , Any ]]) -> Dict [str , Any ]:
@@ -128,22 +177,78 @@ def prepare_usage_json_metrics(metrics: Dict[str, Any], time_frames: List[Dict[s
128
177
129
178
130
179
def set_usage_prometheus_metrics (
131
- registry : CollectorRegistry , metrics : Dict [str , Any ], time_frames : List [Dict [str , Any ]]
180
+ registry : CollectorRegistry ,
181
+ metrics : Dict [str , Any ],
182
+ time_frames : List [Dict [str , Any ]],
132
183
):
133
184
"""
134
- Defines and sets Prometheus metrics based on calculated metrics and time frames .
185
+ Define and register Prometheus metrics for the given usage data .
135
186
"""
136
- total_requests_metric = Gauge ("polytope_total_requests" , "Total number of requests" , registry = registry )
137
- total_requests_metric .set (metrics ["total_requests" ])
187
+ try :
188
+ # Total requests metric
189
+ total_requests_metric = Gauge ("polytope_total_requests" , "Total number of requests" , registry = registry )
190
+ total_requests_metric .set (metrics ["total_requests" ])
138
191
139
- unique_users_metric = Gauge ("polytope_unique_users" , "Total number of unique users" , registry = registry )
140
- unique_users_metric .set (len (metrics ["unique_users" ]))
192
+ # Unique users metric
193
+ unique_users_metric = Gauge ("polytope_unique_users" , "Total number of unique users" , registry = registry )
194
+ unique_users_metric .set (len (metrics ["unique_users" ]))
141
195
142
- for frame in time_frames :
143
- frame_metrics = metrics ["time_frame_metrics" ][frame ["name" ]]
196
+ # Timeframe-specific metrics
197
+ for frame in time_frames :
198
+ frame_metrics = metrics ["time_frame_metrics" ][frame ["name" ]]
199
+
200
+ # Requests metric for this timeframe
201
+ requests_metric = Gauge (
202
+ frame ["request_metric_name" ],
203
+ frame ["request_metric_description" ],
204
+ registry = registry ,
205
+ )
206
+ requests_metric .set (frame_metrics ["requests" ])
207
+
208
+ # Unique users metric for this timeframe
209
+ users_metric = Gauge (
210
+ frame ["user_metric_name" ],
211
+ frame ["user_metric_description" ],
212
+ registry = registry ,
213
+ )
214
+ users_metric .set (len (frame_metrics ["unique_users" ]))
215
+
216
+ except Exception as e :
217
+ logger .error (f"Error setting Prometheus metrics: { e } " )
218
+ raise
219
+
220
+
221
+ def format_output (metrics , time_frames , format : str ):
222
+ """
223
+ Format metrics output as JSON or Prometheus.
224
+ """
225
+ try :
226
+ if format == "json" :
227
+ return JSONResponse (content = prepare_usage_json_metrics (metrics , time_frames ))
228
+
229
+ elif format == "prometheus" :
230
+ # Use a new CollectorRegistry for each request
231
+ registry = CollectorRegistry ()
232
+
233
+ # Set Prometheus metrics
234
+ set_usage_prometheus_metrics (registry , metrics , time_frames )
235
+
236
+ # Generate Prometheus metrics output
237
+ metrics_data = generate_latest (registry )
238
+ return Response (content = metrics_data , media_type = CONTENT_TYPE_LATEST )
239
+
240
+ else :
241
+ raise OutputFormatError (f"Unsupported output format: { format } " )
242
+
243
+ except OutputFormatError as e :
244
+ logger .error (e )
245
+ raise e # Reraise for the main exception handler
144
246
145
- requests_metric = Gauge (frame ["request_metric_name" ], frame ["request_metric_description" ], registry = registry )
146
- requests_metric .set (frame_metrics ["requests" ])
247
+ except Exception as e :
248
+ logger .error (f"Error formatting output: { e } " )
249
+ raise OutputFormatError ("An error occurred while formatting the output" )
250
+ raise e # Reraise for the main exception handler
147
251
148
- users_metric = Gauge (frame ["user_metric_name" ], frame ["user_metric_description" ], registry = registry )
149
- users_metric .set (len (frame_metrics ["unique_users" ]))
252
+ except Exception as e :
253
+ logger .error (f"Error formatting output: { e } " )
254
+ raise OutputFormatError ("An error occurred while formatting the output" )
0 commit comments