Skip to content

Commit

Permalink
Add Chronos support in Mesos plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
amcaar committed Dec 21, 2015
1 parent 9873aaf commit 4921b2a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 8 deletions.
113 changes: 106 additions & 7 deletions cluesplugins/mesos.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,100 @@ def infer_clues_job_state(state):

return res_state

# Determines the equivalent state between Chronos jobs and Clues2 possible job states
def infer_chronos_state(state):
# CHRONOS job states: idle,running,queued,failed,started,finished,disabled,skipped,expired,removed (TODO: check if there are more possible states)
# CLUES2 job states: ATTENDED o PENDING
res_state = ""
if state == 'queued': # or state == 'idle':
res_state = clueslib.request.Request.PENDING
else:
res_state = clueslib.request.Request.ATTENDED
return res_state

# Method to obtain the slave where the chronos job is executed
def obtain_chronosjob_node(job_id):
exit = " "
nodes = []
try:
exit = run_command("/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/tasks.json".split(" "))
json_data = json.loads(exit)
except:
_LOGGER.warning("could not obtain information about Mesos tasks (%s)" % (exit))
return None

job = "ChronosTask:" + job_id
if json_data:
for job, details in json_data.items():
for element in details:
jobname = str(element['name'])
if jobname == job:
node_id = str(element['slave_id'])
try:
exit = run_command("/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/slaves".split(" "))
json_data2 = json.loads(exit2)
except:
_LOGGER.warning("could not obtain information about MESOS nodes (%s)" % (exit))
return None
if json_data2:
for node, det in json_data2.items():
for element in det:
if element['id'] == node_id:
nodes.append(element['hostname'])

return nodes


class lrms(clueslib.platform.LRMS):

# Method in charge of monitoring the job queue of Chronos
def _get_chronos_jobinfolist(self):
exit = " "
jobinfolist = []
try:
exit = run_command(self._chronos)
json_data = json.loads(exit)
except:
_LOGGER.warning("could not obtain information about Chronos %s (%s)" % (self._server_ip, exit))
return None

# process the exit of the chronos command
if json_data:
for job in json_data:
job_id = job['name']
# When the job is running, the name received in mesos is "ChronosTask:<chronosJobName>"
# TODO: call Mesos: hacer un metodo que para un nombre de trabajo devuelva el nodo en el que se ejecuta
nodes = obtain_chronosjob_node(job_id)
#nodes = []
state = ""
numnodes = 1;
memory = job['mem']* 1048576
if memory <= 0:
memory = 536870912
cpus_per_task = float(job['cpus'])
# Use the fake queue
queue = '"default" in queues'
# Ask chronos the current state of the job <name>
# We obtain something like "type,jobName,lastRunStatus,currentState" for each job
try:
exit2 = run_command(self._chronos_state)
exit2 = exit2.split("\n")
for e in exit2:
if e != '':
exit2_split = e.split(",")
if exit2_split[1] == job_id:
state = infer_chronos_state(exit2_split[3])
except:
_LOGGER.warning("could not obtain information about CHRONOS job state %s (%s)" % (self._server_ip, exit))
return None

resources = clueslib.request.ResourcesNeeded(cpus_per_task, memory, [queue], numnodes)
j = clueslib.request.JobInfo(resources, job_id, nodes)
j.set_state(state)
jobinfolist.append(j)

return jobinfolist

# Method in charge of monitoring the job queue of Marathon
def _get_marathon_jobinfolist(self):
exit = " "
Expand All @@ -151,7 +243,7 @@ def _get_marathon_jobinfolist(self):
exit = run_command(self._marathon)
json_data = json.loads(exit)
except:
_LOGGER.error("could not obtain information about Marathon status %s (%s)" % (self._server_ip, exit))
_LOGGER.warning("could not obtain information about Marathon status %s (%s)" % (self._server_ip, exit))
return None

# process the exit of the marathon command
Expand Down Expand Up @@ -189,7 +281,7 @@ def _get_marathon_jobinfolist(self):

return jobinfolist

def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE_COMMAND = None, MESOS_JOBS_COMMAND = None, MESOS_MARATHON_COMMAND = None):
def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE_COMMAND = None, MESOS_JOBS_COMMAND = None, MESOS_MARATHON_COMMAND = None, MESOS_CHRONOS_COMMAND = None, MESOS_CHRONOS_STATE_COMMAND = None):

import cpyutils.config
config_mesos = cpyutils.config.Configuration(
Expand All @@ -199,8 +291,9 @@ def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE
"MESOS_NODES_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/slaves",
"MESOS_STATE_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/state.json",
"MESOS_JOBS_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/tasks.json",
"MESOS_MARATHON_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:8080/v2/apps?embed=tasks"
#"MESOS_CHRONOS_COMMAND":"/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs"
"MESOS_MARATHON_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:8080/v2/apps?embed=tasks",
"MESOS_CHRONOS_COMMAND":"/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs",
"MESOS_CHRONOS_STATE_COMMAND":"/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/graph/csv"
}
)

Expand All @@ -213,8 +306,10 @@ def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE
self._jobs = _jobs_cmd.split(" ")
_marathon_cmd = clueslib.helpers.val_default(MESOS_MARATHON_COMMAND, config_mesos.MESOS_MARATHON_COMMAND)
self._marathon = _marathon_cmd.split(" ")
#_chronos_cmd = clueslib.helpers.val_default(MESOS_CHRONOS_COMMAND, config_mesos.MESOS_CHRONOS_COMMAND)
#self._chronos = _chronos_cmd.split(" ")
_chronos_cmd = clueslib.helpers.val_default(MESOS_CHRONOS_COMMAND, config_mesos.MESOS_CHRONOS_COMMAND)
self._chronos = _chronos_cmd.split(" ")
_chronos_state_cmd = clueslib.helpers.val_default(MESOS_CHRONOS_STATE_COMMAND, config_mesos.MESOS_CHRONOS_STATE_COMMAND)
self._chronos_state = _chronos_state_cmd.split(" ")
clueslib.platform.LRMS.__init__(self, "SLURM_%s" % self._server_ip)

def get_nodeinfolist(self):
Expand Down Expand Up @@ -411,10 +506,14 @@ def get_jobinfolist(self):
jobinfolist.append(j)

# Obtain Marathon jobs and add to the jobinfolist of Mesos
# TODO: create an equivalent method for CHRONOS
jobinfolist2 = self._get_marathon_jobinfolist();
if(jobinfolist2 != None and len(jobinfolist2) > 0):
jobinfolist = list(set(jobinfolist + jobinfolist2))

#Obtain chronos jobs and add them to jobinfolist
jobinfolist3 = self._get_chronos_jobinfolist();
if(jobinfolist3 != None and len(jobinfolist3) > 0):
jobinfolist = list(set(jobinfolist + jobinfolist3))

return jobinfolist

Expand Down
3 changes: 2 additions & 1 deletion etc/conf.d/plugin-mesos.cfg-example
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ MESOS_NODES_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master
MESOS_JOBS_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/tasks.json
MESOS_STATE_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/state.json
MESOS_MARATHON_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:8080/v2/apps?embed=tasks
#MESOS_CHRONOS_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs
MESOS_CHRONOS_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs
MESOS_CHRONOS_STATE_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/graph/csv

0 comments on commit 4921b2a

Please sign in to comment.