4
4
import cloudpickle
5
5
from .utils import remote_exception
6
6
import logging
7
+ from uuid import uuid4
8
+ from mentos .utils import decode_data ,encode_data
7
9
log = logging .getLogger (__name__ )
8
10
9
-
11
+ logging . getLogger (). setLevel ( logging . DEBUG )
10
12
# u('string') replaces the forwards-incompatible u'string'
11
13
if six .PY3 :
12
14
def u (string ):
@@ -25,9 +27,24 @@ def u(string):
25
27
iteritems = dict .iteritems
26
28
iterkeys = dict .iterkeys
27
29
30
+ def bunchify (x ):
31
+ """ Recursively transforms a dictionary into a Message via copy.
32
+
33
+ """
34
+
35
+
28
36
class Message (dict ):
29
37
""" A dictionary that provides attribute-style access.
30
38
"""
39
+ @classmethod
40
+ def convert (cls ,x ):
41
+ if isinstance (x , dict ):
42
+ return cls ((k , cls .convert (v )) for k , v in iteritems (x ))
43
+ elif isinstance (x , (list , tuple )):
44
+ return type (x )(cls .convert (v ) for v in x )
45
+ else :
46
+ return x
47
+
31
48
32
49
def __contains__ (self , k ):
33
50
"""
@@ -112,7 +129,10 @@ class ResourceMixin(object):
112
129
@staticmethod
113
130
def flatten (message ):
114
131
flattened = {}
115
- for r in message ['resources' ]:
132
+ if isinstance (message , (int , float , complex )):
133
+ val = message
134
+ message = {"resources" :[Cpus (val ),Disk (val ),Mem (val )]}
135
+ for r in message ["resources" ]:
116
136
if r ["type" ]== "RANGES" :
117
137
flattened [r ['name' ]] = r ['ranges' ]['range' ]
118
138
else :
@@ -190,21 +210,21 @@ def __isub__(self, second):
190
210
def cpus (self ):
191
211
for res in self .resources :
192
212
if res ["name" ] == "cpus" :
193
- return res
213
+ return Message . convert ( res )
194
214
return Cpus (0.0 )
195
215
196
216
@property
197
217
def mem (self ):
198
218
for res in self ["resources" ]:
199
219
if res ["name" ] == "mem" :
200
- return res
220
+ return Message . convert ( res )
201
221
return Mem (0.0 )
202
222
203
223
@property
204
224
def disk (self ):
205
225
for res in self ["resources" ]:
206
226
if res ["name" ] == "disk" :
207
- return res
227
+ return Message . convert ( res )
208
228
return Disk (0.0 )
209
229
210
230
@property
@@ -218,28 +238,35 @@ def ports(self):
218
238
class TaskInfo (ResourceMixin , Message ):
219
239
pass
220
240
221
- class Offer (ResourceMixin , Message ):
222
- pass
223
-
224
241
225
242
226
- class PickleMixin (object ):
243
+ class Offer (ResourceMixin , Message ):
244
+ @property
245
+ def slave_id (self ):
246
+ try :
247
+ return self ["slave_id" ]
248
+ except KeyError :
249
+ return self ["agent_id" ]
227
250
228
251
@property
229
- def data (self ):
230
- return cloudpickle .loads (self ['data' ])
252
+ def agent_id (self ):
253
+ try :
254
+ return self ["agent_id" ]
255
+ except KeyError :
256
+ return self ["slave_id" ]
231
257
232
- @data .setter
233
- def data (self , value ):
234
- self ['data' ] = cloudpickle .dumps (value )
235
258
236
259
237
- class PythonTaskStatus (PickleMixin , Message ):
260
+ class PythonTaskStatus (Message ):
238
261
239
262
def __init__ (self , data = None , ** kwargs ):
240
263
super (PythonTaskStatus , self ).__init__ (** kwargs )
241
264
self .labels = Message (labels = Message (key = 'python' ))
242
- self .data = data
265
+ self .data = encode_data (cloudpickle .dumps (data ))
266
+
267
+ @property
268
+ def result (self ):
269
+ return cloudpickle .loads (decode_data (self ["data" ]))
243
270
244
271
@property
245
272
def exception (self ):
@@ -248,41 +275,66 @@ def exception(self):
248
275
except :
249
276
return None
250
277
278
+ def is_staging (self ):
279
+ return self .state == 'TASK_STAGING'
280
+
281
+ def is_starting (self ):
282
+ return self .state == 'TASK_STARTING'
283
+
284
+ def is_running (self ):
285
+ return self .state == 'TASK_RUNNING'
286
+
287
+ def has_finished (self ):
288
+ return self .state == 'TASK_FINISHED'
289
+
290
+ def has_succeeded (self ):
291
+ return self .state == 'TASK_FINISHED'
292
+
293
+ def has_killed (self ):
294
+ return self .state == 'TASK_KILLED'
295
+
296
+ def has_failed (self ):
297
+ return self .state in ['TASK_FAILED' , 'TASK_LOST' , 'TASK_KILLED' ,
298
+ 'TASK_ERROR' ]
299
+
300
+ def has_terminated (self ):
301
+ return self .has_succeeded () or self .has_failed ()
251
302
252
303
# TODO create custom messages per executor
253
- class PythonTask (PickleMixin , TaskInfo ):
304
+ class PythonTask (TaskInfo ):
254
305
255
306
256
307
def __init__ (self , fn = None , args = [], kwargs = {},
257
308
resources = [Cpus (0.1 ), Mem (128 ), Disk (0 )],
258
309
executor = None , retries = 3 , ** kwds ):
259
310
super (PythonTask , self ).__init__ (** kwds )
260
- self .status = PythonTaskStatus (task_id = self .id , state = 'TASK_STAGING' )
261
- self .executor = executor or PythonExecutor ()
262
- self .data = (fn , args , kwargs )
311
+ self .task_id = self .get ("task_id" , Message (value = str (uuid4 ())))
312
+ self .status = PythonTaskStatus (task_id = self .task_id , state = 'TASK_STAGING' )
313
+ self .executor = executor or PythonExecutor ("python-executor" )
314
+ self .data = encode_data (cloudpickle .dumps (self .get ("data" ,(fn , args , kwargs ))))
263
315
self .resources = resources
264
316
self .retries = retries
265
317
self .attempt = 1
266
318
267
319
self .labels = Message (labels = Message (key = 'python' ))
268
320
269
321
def __call__ (self ):
270
- fn , args , kwargs = self .data
322
+ fn , args , kwargs = cloudpickle . loads ( decode_data ( self .data ))
271
323
return fn (* args , ** kwargs )
272
324
273
325
def retry (self , status ):
274
326
if self .attempt < self .retries :
275
327
log .info ('Task {} attempt #{} rescheduled due to failure with state '
276
- '{} and message {}' .format (self .id , self .attempt ,
328
+ '{} and message {}' .format (self .task_id , self .attempt ,
277
329
status .state , status .message ))
278
330
self .attempt += 1
279
331
status .state = 'TASK_STAGING'
280
332
else :
281
333
log .error ('Aborting due to task {} failed for {} attempts in state '
282
- '{} with message {}' .format (self .id , self .retries ,
334
+ '{} with message {}' .format (self .task_id , self .retries ,
283
335
status .state , status .message ))
284
336
raise RuntimeError ('Task {} failed with state {} and message {}' .format (
285
- self .id , status .state , status .message ))
337
+ self .task_id , status .state , status .message ))
286
338
287
339
def update (self , status ):
288
340
self .on_update (status )
@@ -294,14 +346,14 @@ def update(self, status):
294
346
def on_update (self , status ):
295
347
self .status = status # update task's status
296
348
log .info ('Task {} has been updated with state {}' .format (
297
- self .id .value , status .state ))
349
+ self .task_id .value , status .state ))
298
350
299
351
def on_success (self , status ):
300
- log .info ('Task {} has been succeded' .format (self .id .value ))
352
+ log .info ('Task {} has been succeded' .format (self .task_id .value ))
301
353
302
354
def on_fail (self , status ):
303
355
log .error ('Task {} has been failed with state {} due to {}' .format (
304
- self .id .value , status .state , status .message ))
356
+ self .task_id .value , status .state , status .message ))
305
357
306
358
try :
307
359
raise status .exception # won't retry due to code error in PythonTaskStatus
@@ -310,21 +362,22 @@ def on_fail(self, status):
310
362
self .retry (status )
311
363
else :
312
364
log .error ('Aborting due to task {} failed with state {} and message '
313
- '{}' .format (self .id , status .state , status .message ))
365
+ '{}' .format (self .task_id , status .state , status .message ))
314
366
315
367
316
368
class PythonExecutor (Message ):
317
369
318
370
319
371
320
- def __init__ (self , docker = 'satyr' , force_pull = False ,
372
+ def __init__ (self , id , docker = 'satyr' , force_pull = False ,
321
373
envs = {}, uris = [], ** kwds ):
322
374
super (PythonExecutor , self ).__init__ (** kwds )
323
375
self .container = Message (
324
376
type = 'MESOS' ,
325
377
mesos = Message (
326
378
image = Message (type = 'DOCKER' ,
327
379
docker = Message (name = docker ))))
380
+ self .executor_id = Message (value = id )
328
381
self .command = Message (value = 'python -m satyr.executor' ,
329
382
shell = True )
330
383
self .force_pull = force_pull
0 commit comments