26
26
27
27
LOG = logging .getLogger ('fastpurge' )
28
28
29
-
30
29
Purge = namedtuple ('Purge' , [
31
30
# Response from Akamai when this purge was created
32
31
'response_body' ,
@@ -48,16 +47,17 @@ def new(self, **kw):
48
47
def increment (self , method , url , * args , ** kwargs ):
49
48
response = kwargs .get ("response" )
50
49
if response :
51
- self ._logger .error ("An invalid status code %s was received "
52
- " when trying to %s to %s: %s"
53
- % ( response .status , method , url , response .reason ))
50
+ self ._logger .error (
51
+ "An invalid status code %s was received when trying to %s to %s: %s" % (
52
+ response .status , method , url , response .reason ))
54
53
else :
55
54
self ._logger .error (
56
55
"An unknown error occurred when trying to %s to %s"
57
56
% (method , url ))
58
57
return super (LoggingRetry , self ).increment (method , url , * args ,
59
58
** kwargs )
60
59
60
+
61
61
class FastPurgeError (RuntimeError ):
62
62
"""Raised when the Fast Purge API reports an error.
63
63
@@ -98,7 +98,8 @@ class FastPurgeClient(object):
98
98
MAX_REQUESTS = int (os .environ .get ("FAST_PURGE_MAX_REQUESTS" , "10" ))
99
99
100
100
# Default network matches Akamai's documented default
101
- DEFAULT_NETWORK = os .environ .get ("FAST_PURGE_DEFAULT_NETWORK" , "production" )
101
+ DEFAULT_NETWORK = os .environ .get ("FAST_PURGE_DEFAULT_NETWORK" ,
102
+ "production" )
102
103
103
104
# Max number of retries allowed for HTTP requests, and the backoff used
104
105
# to extend the delay between requests.
@@ -168,11 +169,11 @@ def __executor(self):
168
169
if self .___executor is None :
169
170
with self .__lock :
170
171
if self .___executor is None :
171
- self .___executor = Executors .\
172
- sync (name = "fastpurge" ).\
173
- with_poll (self .__poll_purges ).\
174
- with_throttle (count = self .MAX_REQUESTS ).\
175
- with_retry ().\
172
+ self .___executor = Executors . \
173
+ sync (name = "fastpurge" ). \
174
+ with_poll (self .__poll_purges ). \
175
+ with_throttle (count = self .MAX_REQUESTS ). \
176
+ with_retry (). \
176
177
with_cancel_on_shutdown ()
177
178
return self .___executor
178
179
@@ -189,7 +190,8 @@ def __poll_purges(cls, descriptors):
189
190
earliest = now + cls .DEFAULT_DELAY
190
191
191
192
for descriptor in descriptors :
192
- purge_id = descriptor .result .response_body .get ('purgeId' , '<unknown purge>' )
193
+ purge_id = descriptor .result .response_body .get ('purgeId' ,
194
+ '<unknown purge>' )
193
195
when_complete = descriptor .result .estimated_complete
194
196
earliest = min (earliest , when_complete )
195
197
if when_complete > now :
@@ -212,7 +214,8 @@ def __poll_purges(cls, descriptors):
212
214
# here.
213
215
descriptor .yield_result (descriptor .result .response_body )
214
216
215
- LOG .debug ("now %s, earliest %s, sleep %s" , now , earliest , earliest - now )
217
+ LOG .debug ("now %s, earliest %s, sleep %s" , now , earliest ,
218
+ earliest - now )
216
219
return min (earliest - now , cls .DEFAULT_DELAY )
217
220
218
221
@property
@@ -267,25 +270,27 @@ def __get_request_bodies(self, objects):
267
270
268
271
# Too big for a single request.
269
272
# Split it in half and try again
270
- part = int (len (objects )/ 2 )
273
+ part = int (len (objects ) / 2 )
271
274
objects_a , objects_b = objects [:part ], objects [part :]
272
- return self .__get_request_bodies (objects_a ) + self .__get_request_bodies (objects_b )
275
+ return self .__get_request_bodies (
276
+ objects_a ) + self .__get_request_bodies (objects_b )
273
277
274
278
def __start_purge (self , endpoint , request_body ):
275
279
headers = {'Content-Type' : 'application/json' }
276
280
LOG .debug ("POST JSON of size %s to %s" , len (request_body ), endpoint )
277
281
try :
278
- response = self .__session .post (endpoint , data = request_body , headers = headers )
282
+ response = self .__session .post (endpoint , data = request_body ,
283
+ headers = headers )
279
284
response_body = response .json ()
280
285
estimated_seconds = response_body .get ('estimatedSeconds' , 5 )
281
286
return Purge (response_body , monotonic () + estimated_seconds )
282
287
except RetryError as e :
283
288
message = "Request to {endpoint} was unsuccessful after {retries} retries: {reason}" . \
284
- format (endpoint = endpoint , retries = self .MAX_RETRIES , reason = e .args [0 ].reason )
289
+ format (endpoint = endpoint , retries = self .MAX_RETRIES ,
290
+ reason = e .args [0 ].reason )
285
291
LOG .debug (message )
286
292
raise FastPurgeError (message )
287
293
288
-
289
294
def purge_objects (self , object_type , objects , ** kwargs ):
290
295
"""Purge a collection of objects.
291
296
@@ -348,7 +353,8 @@ def purge_objects(self, object_type, objects, **kwargs):
348
353
349
354
futures = []
350
355
for request_body in request_bodies :
351
- future = self .__executor .submit (self .__start_purge , endpoint , request_body )
356
+ future = self .__executor .submit (self .__start_purge , endpoint ,
357
+ request_body )
352
358
futures .append (future )
353
359
354
360
return f_sequence (futures )
0 commit comments