26
26
27
27
LOG = logging .getLogger ('fastpurge' )
28
28
29
-
30
29
Purge = namedtuple ('Purge' , [
31
30
# Response from Akamai when this purge was created
32
31
'response_body' ,
@@ -50,14 +49,16 @@ def increment(self, method, url, *args, **kwargs):
50
49
if response :
51
50
self ._logger .error ("An invalid status code %s was received "
52
51
"when trying to %s to %s: %s"
53
- % (response .status , method , url , response .reason ))
52
+ % (
53
+ response .status , method , url , response .reason ))
54
54
else :
55
55
self ._logger .error (
56
56
"An unknown error occurred when trying to %s to %s"
57
57
% (method , url ))
58
58
return super (LoggingRetry , self ).increment (method , url , * args ,
59
59
** kwargs )
60
60
61
+
61
62
class FastPurgeError (RuntimeError ):
62
63
"""Raised when the Fast Purge API reports an error.
63
64
@@ -98,7 +99,8 @@ class FastPurgeClient(object):
98
99
MAX_REQUESTS = int (os .environ .get ("FAST_PURGE_MAX_REQUESTS" , "10" ))
99
100
100
101
# Default network matches Akamai's documented default
101
- DEFAULT_NETWORK = os .environ .get ("FAST_PURGE_DEFAULT_NETWORK" , "production" )
102
+ DEFAULT_NETWORK = os .environ .get ("FAST_PURGE_DEFAULT_NETWORK" ,
103
+ "production" )
102
104
103
105
# Max number of retries allowed for HTTP requests, and the backoff used
104
106
# to extend the delay between requests.
@@ -168,11 +170,11 @@ def __executor(self):
168
170
if self .___executor is None :
169
171
with self .__lock :
170
172
if self .___executor is None :
171
- self .___executor = Executors .\
172
- sync (name = "fastpurge" ).\
173
- with_poll (self .__poll_purges ).\
174
- with_throttle (count = self .MAX_REQUESTS ).\
175
- with_retry ().\
173
+ self .___executor = Executors . \
174
+ sync (name = "fastpurge" ). \
175
+ with_poll (self .__poll_purges ). \
176
+ with_throttle (count = self .MAX_REQUESTS ). \
177
+ with_retry (). \
176
178
with_cancel_on_shutdown ()
177
179
return self .___executor
178
180
@@ -189,7 +191,8 @@ def __poll_purges(cls, descriptors):
189
191
earliest = now + cls .DEFAULT_DELAY
190
192
191
193
for descriptor in descriptors :
192
- purge_id = descriptor .result .response_body .get ('purgeId' , '<unknown purge>' )
194
+ purge_id = descriptor .result .response_body .get ('purgeId' ,
195
+ '<unknown purge>' )
193
196
when_complete = descriptor .result .estimated_complete
194
197
earliest = min (earliest , when_complete )
195
198
if when_complete > now :
@@ -212,7 +215,8 @@ def __poll_purges(cls, descriptors):
212
215
# here.
213
216
descriptor .yield_result (descriptor .result .response_body )
214
217
215
- LOG .debug ("now %s, earliest %s, sleep %s" , now , earliest , earliest - now )
218
+ LOG .debug ("now %s, earliest %s, sleep %s" , now , earliest ,
219
+ earliest - now )
216
220
return min (earliest - now , cls .DEFAULT_DELAY )
217
221
218
222
@property
@@ -267,25 +271,27 @@ def __get_request_bodies(self, objects):
267
271
268
272
# Too big for a single request.
269
273
# Split it in half and try again
270
- part = int (len (objects )/ 2 )
274
+ part = int (len (objects ) / 2 )
271
275
objects_a , objects_b = objects [:part ], objects [part :]
272
- return self .__get_request_bodies (objects_a ) + self .__get_request_bodies (objects_b )
276
+ return self .__get_request_bodies (
277
+ objects_a ) + self .__get_request_bodies (objects_b )
273
278
274
279
def __start_purge (self , endpoint , request_body ):
275
280
headers = {'Content-Type' : 'application/json' }
276
281
LOG .debug ("POST JSON of size %s to %s" , len (request_body ), endpoint )
277
282
try :
278
- response = self .__session .post (endpoint , data = request_body , headers = headers )
283
+ response = self .__session .post (endpoint , data = request_body ,
284
+ headers = headers )
279
285
response_body = response .json ()
280
286
estimated_seconds = response_body .get ('estimatedSeconds' , 5 )
281
287
return Purge (response_body , monotonic () + estimated_seconds )
282
288
except RetryError as e :
283
289
message = "Request to {endpoint} was unsuccessful after {retries} retries: {reason}" . \
284
- format (endpoint = endpoint , retries = self .MAX_RETRIES , reason = e .args [0 ].reason )
290
+ format (endpoint = endpoint , retries = self .MAX_RETRIES ,
291
+ reason = e .args [0 ].reason )
285
292
LOG .debug (message )
286
293
raise FastPurgeError (message )
287
294
288
-
289
295
def purge_objects (self , object_type , objects , ** kwargs ):
290
296
"""Purge a collection of objects.
291
297
@@ -348,7 +354,8 @@ def purge_objects(self, object_type, objects, **kwargs):
348
354
349
355
futures = []
350
356
for request_body in request_bodies :
351
- future = self .__executor .submit (self .__start_purge , endpoint , request_body )
357
+ future = self .__executor .submit (self .__start_purge , endpoint ,
358
+ request_body )
352
359
futures .append (future )
353
360
354
361
return f_sequence (futures )
0 commit comments