diff --git a/cluesplugins/mesos.py b/cluesplugins/mesos.py index 2590ae5..1ee4bef 100644 --- a/cluesplugins/mesos.py +++ b/cluesplugins/mesos.py @@ -141,8 +141,100 @@ def infer_clues_job_state(state): return res_state +# Determines the equivalent state between Chronos jobs and Clues2 possible job states +def infer_chronos_state(state): + # CHRONOS job states: idle,running,queued,failed,started,finished,disabled,skipped,expired,removed (TODO: check if there are more possible states) + # CLUES2 job states: ATTENDED o PENDING + res_state = "" + if state == 'queued': # or state == 'idle': + res_state = clueslib.request.Request.PENDING + else: + res_state = clueslib.request.Request.ATTENDED + return res_state + +# Method to obtain the slave where the chronos job is executed +def obtain_chronosjob_node(job_id): + exit = " " + nodes = [] + try: + exit = run_command("/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/tasks.json".split(" ")) + json_data = json.loads(exit) + except: + _LOGGER.warning("could not obtain information about Mesos tasks (%s)" % (exit)) + return None + + job = "ChronosTask:" + job_id + if json_data: + for job, details in json_data.items(): + for element in details: + jobname = str(element['name']) + if jobname == job: + node_id = str(element['slave_id']) + try: + exit = run_command("/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/slaves".split(" ")) + json_data2 = json.loads(exit2) + except: + _LOGGER.warning("could not obtain information about MESOS nodes (%s)" % (exit)) + return None + if json_data2: + for node, det in json_data2.items(): + for element in det: + if element['id'] == node_id: + nodes.append(element['hostname']) + + return nodes + + class lrms(clueslib.platform.LRMS): + # Method in charge of monitoring the job queue of Chronos + def _get_chronos_jobinfolist(self): + exit = " " + jobinfolist = [] + try: + exit = run_command(self._chronos) + json_data = json.loads(exit) + except: + _LOGGER.warning("could not obtain information about Chronos %s (%s)" % (self._server_ip, exit)) + return None + + # process the exit of the chronos command + if json_data: + for job in json_data: + job_id = job['name'] + # When the job is running, the name received in mesos is "ChronosTask:" + # TODO: call Mesos: hacer un metodo que para un nombre de trabajo devuelva el nodo en el que se ejecuta + nodes = obtain_chronosjob_node(job_id) + #nodes = [] + state = "" + numnodes = 1; + memory = job['mem']* 1048576 + if memory <= 0: + memory = 536870912 + cpus_per_task = float(job['cpus']) + # Use the fake queue + queue = '"default" in queues' + # Ask chronos the current state of the job + # We obtain something like "type,jobName,lastRunStatus,currentState" for each job + try: + exit2 = run_command(self._chronos_state) + exit2 = exit2.split("\n") + for e in exit2: + if e != '': + exit2_split = e.split(",") + if exit2_split[1] == job_id: + state = infer_chronos_state(exit2_split[3]) + except: + _LOGGER.warning("could not obtain information about CHRONOS job state %s (%s)" % (self._server_ip, exit)) + return None + + resources = clueslib.request.ResourcesNeeded(cpus_per_task, memory, [queue], numnodes) + j = clueslib.request.JobInfo(resources, job_id, nodes) + j.set_state(state) + jobinfolist.append(j) + + return jobinfolist + # Method in charge of monitoring the job queue of Marathon def _get_marathon_jobinfolist(self): exit = " " @@ -151,7 +243,7 @@ def _get_marathon_jobinfolist(self): exit = run_command(self._marathon) json_data = json.loads(exit) except: - _LOGGER.error("could not obtain information about Marathon status %s (%s)" % (self._server_ip, exit)) + _LOGGER.warning("could not obtain information about Marathon status %s (%s)" % (self._server_ip, exit)) return None # process the exit of the marathon command @@ -189,7 +281,7 @@ def _get_marathon_jobinfolist(self): return jobinfolist - def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE_COMMAND = None, MESOS_JOBS_COMMAND = None, MESOS_MARATHON_COMMAND = None): + def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE_COMMAND = None, MESOS_JOBS_COMMAND = None, MESOS_MARATHON_COMMAND = None, MESOS_CHRONOS_COMMAND = None, MESOS_CHRONOS_STATE_COMMAND = None): import cpyutils.config config_mesos = cpyutils.config.Configuration( @@ -199,8 +291,9 @@ def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE "MESOS_NODES_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/slaves", "MESOS_STATE_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/state.json", "MESOS_JOBS_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/tasks.json", - "MESOS_MARATHON_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:8080/v2/apps?embed=tasks" - #"MESOS_CHRONOS_COMMAND":"/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs" + "MESOS_MARATHON_COMMAND": "/usr/bin/curl -L -X GET http://mesosserverpublic:8080/v2/apps?embed=tasks", + "MESOS_CHRONOS_COMMAND":"/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs", + "MESOS_CHRONOS_STATE_COMMAND":"/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/graph/csv" } ) @@ -213,8 +306,10 @@ def __init__(self, MESOS_SERVER = None, MESOS_NODES_COMMAND = None, MESOS_STATE self._jobs = _jobs_cmd.split(" ") _marathon_cmd = clueslib.helpers.val_default(MESOS_MARATHON_COMMAND, config_mesos.MESOS_MARATHON_COMMAND) self._marathon = _marathon_cmd.split(" ") - #_chronos_cmd = clueslib.helpers.val_default(MESOS_CHRONOS_COMMAND, config_mesos.MESOS_CHRONOS_COMMAND) - #self._chronos = _chronos_cmd.split(" ") + _chronos_cmd = clueslib.helpers.val_default(MESOS_CHRONOS_COMMAND, config_mesos.MESOS_CHRONOS_COMMAND) + self._chronos = _chronos_cmd.split(" ") + _chronos_state_cmd = clueslib.helpers.val_default(MESOS_CHRONOS_STATE_COMMAND, config_mesos.MESOS_CHRONOS_STATE_COMMAND) + self._chronos_state = _chronos_state_cmd.split(" ") clueslib.platform.LRMS.__init__(self, "SLURM_%s" % self._server_ip) def get_nodeinfolist(self): @@ -411,10 +506,14 @@ def get_jobinfolist(self): jobinfolist.append(j) # Obtain Marathon jobs and add to the jobinfolist of Mesos - # TODO: create an equivalent method for CHRONOS jobinfolist2 = self._get_marathon_jobinfolist(); if(jobinfolist2 != None and len(jobinfolist2) > 0): jobinfolist = list(set(jobinfolist + jobinfolist2)) + + #Obtain chronos jobs and add them to jobinfolist + jobinfolist3 = self._get_chronos_jobinfolist(); + if(jobinfolist3 != None and len(jobinfolist3) > 0): + jobinfolist = list(set(jobinfolist + jobinfolist3)) return jobinfolist diff --git a/etc/conf.d/plugin-mesos.cfg-example b/etc/conf.d/plugin-mesos.cfg-example index ef5d726..292a1b5 100644 --- a/etc/conf.d/plugin-mesos.cfg-example +++ b/etc/conf.d/plugin-mesos.cfg-example @@ -11,4 +11,5 @@ MESOS_NODES_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master MESOS_JOBS_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/tasks.json MESOS_STATE_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:5050/master/state.json MESOS_MARATHON_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:8080/v2/apps?embed=tasks -#MESOS_CHRONOS_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs +MESOS_CHRONOS_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/jobs +MESOS_CHRONOS_STATE_COMMAND=/usr/bin/curl -L -X GET http://mesosserverpublic:4400/scheduler/graph/csv