Skip to content

Commit

Permalink
Job with type envelope must ask resources now (more simple)
Browse files Browse the repository at this point in the history
  • Loading branch information
augu5te committed Sep 25, 2024
1 parent 909bc9f commit eb03978
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 70 deletions.
3 changes: 0 additions & 3 deletions oar/cli/oarsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ def connect_job(session, config, job_id, stop_oarexec, openssh_cmd, cmd_ret):
host_to_connect_via_ssh = config["COSYSTEM_HOSTNAME"]
elif "deploy" in types:
host_to_connect_via_ssh = config["DEPLOY_HOSTNAME"]
elif "envelope" in types:
host_to_connect_via_ssh = config["ENVELOPE_HOSTNAME"]

# cpuset part
cpuset_field = config["JOB_RESOURCE_MANAGER_PROPERTY_DB_FIELD"]
Expand All @@ -89,7 +87,6 @@ def connect_job(session, config, job_id, stop_oarexec, openssh_cmd, cmd_ret):
and cpuset_path
and ("cosystem" not in types)
and ("deploy" not in types)
and ("envelope" not in types)
and hosts
):
os.environ["OAR_CPUSET"] = (
Expand Down
21 changes: 2 additions & 19 deletions oar/kao/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from oar.lib.job_handling import ALLOW, JobPseudo
from oar.lib.models import Job

# for quotas and job envelope
from oar.lib.resource import MAX_NB_RESOURCES, ResourceSet
# for quotas
from oar.lib.resource import ResourceSet

config, db = init_oar(no_db=True)
logger = get_logger("oar.kamelot", forward_stderr=True)
Expand Down Expand Up @@ -289,12 +289,6 @@ def find_first_suitable_contiguous_slots_no_quotas(
itvs_avail = intersec_supersed_itvs_slots(
slots, sid_left, sid_right, job.supersed
)
# jid_to_supersed = int(job.supersed)
# if jid_to_supersed in scheduled_jobs.keys():
# job_supersed = scheduled_jobs[jid_to_supersed]
# itvs_avail = intersec_supersed_itvs_slots(
# slots, sid_left, sid_right, job_supersed
# )

if job.find:
itvs = job.find_func(
Expand Down Expand Up @@ -450,17 +444,6 @@ def schedule_id_jobs_ct(slots_sets, jobs, hy, id_jobs, job_security_time):
min_start_time = -1
to_skip = False

if "envelope" in job.types:
logger.info("Job envelope, no resources is needed")
mld_id, walltime, _ = job.mld_res_rqts[0]
job.moldable_id = mld_id
# Correspond to null resources (resource_id = 0 in database)
job.res_set = ProcSet(MAX_NB_RESOURCES - 1)
# Start job immediately
job.start_time = 0
job.walltime = walltime
continue

# Dependencies
for j_dep in job.deps:
jid_dep, state, exit_code = j_dep
Expand Down
15 changes: 3 additions & 12 deletions oar/lib/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ def apply_admission_rules(
if ("ADMISSION_RULES_IN_FILES" in config) and (
config["ADMISSION_RULES_IN_FILES"] == "yes"
):

if rule:
regex = rule
else:
Expand Down Expand Up @@ -881,17 +880,9 @@ def add_micheline_subjob(
properties = job_parameters.properties
resource_request = job_parameters.resource_request

if "envelope" in job_parameters.types:
error = (0,)
resource_desc, walltime = resource_request[0]
if not walltime:
walltime = str(config["DEFAULT_JOB_WALLTIME"])
estimated_nb_resources = [(0, walltime)]
else:
error, resource_available, estimated_nb_resources = estimate_job_nb_resources(
session, config, resource_request, properties
)

error, resource_available, estimated_nb_resources = estimate_job_nb_resources(
session, config, resource_request, properties
)
if error[0] != 0:
return (error, -1)

Expand Down
11 changes: 2 additions & 9 deletions oar/modules/bipbip.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ def run(self, session, config):
if cpuset_path and cpuset_name:
cpuset_full_path = cpuset_path + "/" + cpuset_name

if "envelope" in job_types.keys():
hosts = [config["ENVELOPE_HOSTNAME"]]

# Check if we must treate the end of a oarexec
if self.oarexec_reattach_exit_value and job.state in [
"Launching",
Expand Down Expand Up @@ -239,7 +236,6 @@ def run(self, session, config):
if (
("deploy" not in job_types.keys())
and ("cosystem" not in job_types.keys())
and ("envelope" not in job_types.keys())
and (len(hosts) > 0)
):
bad = []
Expand All @@ -252,8 +248,8 @@ def run(self, session, config):
nodes_cpuset_fields = get_cpuset_values(
session, config, cpuset_field, job.assigned_moldable_job
)
# import pdb; pdb.set_trace()
self.logger.debug(f"xxxxxxxxxxxxx cpuset_name: {cpuset_name}")

self.logger.debug(f"cpuset_name: {cpuset_name}")
if nodes_cpuset_fields and len(nodes_cpuset_fields) > 0:
ssh_public_key = format_ssh_pub_key(
ssh_public_key, cpuset_full_path, job.user, job.user
Expand Down Expand Up @@ -467,8 +463,6 @@ def run(self, session, config):
head_node = config["COSYSTEM_HOSTNAME"]
elif "deploy" in job_types.keys():
head_node = config["DEPLOY_HOSTNAME"]
elif "envelope" in job_types.keys():
head_node = config["ENVELOPE_HOSTNAME"]

almighty_hostname = config["SERVER_HOSTNAME"]
if re.match(r"\s*localhost.*$", almighty_hostname) or re.match(
Expand Down Expand Up @@ -542,7 +536,6 @@ def run(self, session, config):
cpuset_full_path
and ("cosystem" not in job_types.keys())
and ("deploy" not in job_types.keys())
and ("envelope" not in job_types.keys())
and (len(hosts) > 0)
):
# for oarsh_shell connection
Expand Down
20 changes: 8 additions & 12 deletions oar/modules/node_change_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def run(self, session):
(event.type == "SWITCH_INTO_TERMINATE_STATE")
or (event.type == "SWITCH_INTO_ERROR_STATE")
) and (job.exit_code and (job.exit_code >> 8) == 99):
job_types = get_job_types(session, job_id) #TOREMOVE
job_types = get_job_types(session, job_id) # TOREMOVE
if "idempotent" in job_types.keys():
if (
job.reservation == "None"
Expand Down Expand Up @@ -199,17 +199,15 @@ def run(self, session):
else:
# If we exterminate a job and the cpuset feature is configured
# then the CPUSET clean will tell us which nodes are dead
if ("JOB_RESOURCE_MANAGER_PROPERTY_DB_FIELD" in config) and (
event.type == "EXTERMINATE_JOB"
):
if (
"JOB_RESOURCE_MANAGER_PROPERTY_DB_FIELD" in config
) and (event.type == "EXTERMINATE_JOB"):
hosts = []
add_new_event_with_host(
session, "LOG_SUSPECTED", 0, event.description, hosts
)
else:
# TODO recheck w/ OAR2 version to support other job_types (cosystem, noop, deploy)
if "envelope" in job_types.keys():
logger.warning(f"Job {job_id} is an envelope, event {event.type} is not handled")
# else:
# TODO recheck w/ OAR2 version to support other job_types (cosystem, noop, deploy)

if len(hosts) > 0:
already_treated_hosts = {}
Expand Down Expand Up @@ -270,11 +268,9 @@ def run(self, session):
"CANNOT_CREATE_TMP_DIRECTORY",
"LAUNCHING_OAREXEC_TIMEOUT",
]
# TODO recheck w/ OAR2 version to support other job_types (cosystem, noop, deploy)
if "envelope" in job_types.keys():
logger.warning(f"Job {job_id} is an envelope, event {event.type} is not handled")

if event.type in type_to_check and not ("envelope" in job_types.keys()):
# TODO recheck w/ OAR2 version to support other job_types (cosystem, noop, deploy)
if event.type in type_to_check:
if (
job.reservation == "None"
and job.type == "PASSIVE"
Expand Down
2 changes: 0 additions & 2 deletions oar/tools/oar.conf.in
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ DEPLOY_HOSTNAME="127.0.0.1"
# Specify where we are connected with a job of the cosystem type
COSYSTEM_HOSTNAME="127.0.0.1"

# Specify where we are connected with a envelope type job
ENVELOPE_HOSTNAME="127.0.0.1"
# Configuration for module: server, user, api

# Specify what system to use for the execution on the deploy and cosystem frontends
Expand Down
2 changes: 1 addition & 1 deletion oar/tools/oarexec
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ if ( $Job->{mode} eq "PASSIVE" ){
my $str;
($str, $stdin_script_to_send) = OAR::Tools::get_oarexecuser_perl_script_for_oarexec($Node_file,$Job_id,$Job->{array_id},$Job->{array_index},$Job->{user},$shell,$Job->{launching_directory},$Job->{stdout_file},$Job->{stderr_file},$Res_file,$Job->{name},$Job->{project},$Job->{walltime},$Job->{walltime_seconds},$Job->{job_env},,$Job->{types},$Job->{command});
@cmd = ("oardodo","perl","-e",$str);
if ((defined($Job->{types}->{deploy}) or defined($Job->{types}->{cosystem}) or defined($Job->{types}->{envelope})) and defined($Job->{deploy_cosystem_job_exec_system})) {
if ((defined($Job->{types}->{deploy}) or defined($Job->{types}->{cosystem})) and defined($Job->{deploy_cosystem_job_exec_system})) {
if ($Job->{deploy_cosystem_job_exec_system} eq "systemd-run") {
$oardo_become_user = "";
@cmd = ("oardodo","systemd-run","-q","--uid=$uid","--scope","--collect","--slice=user-$uid.slice","perl","-e",$str);
Expand Down
14 changes: 11 additions & 3 deletions tests/kao/test_db_kao.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_db_kao_envelope_1(monkeypatch, minimal_db_initialization, setup_config)
insert_job(
minimal_db_initialization,
types=["envelope"],
res=[(60, [("resource_id=0", "")])],
res=[(60, [("resource_id=1", "")])],
properties="",
)

Expand All @@ -116,7 +116,7 @@ def test_db_kao_envelope_1(monkeypatch, minimal_db_initialization, setup_config)
assert job.state == "toLaunch"

res = minimal_db_initialization.query(AssignedResource).one()
assert res.resource_id == 0
assert res.resource_id == 1


def test_db_kao_envelope_2(monkeypatch, minimal_db_initialization, setup_config):
Expand All @@ -125,7 +125,7 @@ def test_db_kao_envelope_2(monkeypatch, minimal_db_initialization, setup_config)
j1_id = insert_job(
minimal_db_initialization,
types=["envelope"],
res=[(60, [("resource_id=0", "")])],
res=[(60, [("resource_id=1", "")])],
properties="",
)

Expand All @@ -147,3 +147,11 @@ def test_db_kao_envelope_2(monkeypatch, minimal_db_initialization, setup_config)
# It seems better envelope must be running be launch leaflet job.
# This situation could be easily avoid by checking before db insert
assert jobs[j2_id].state == "toLaunch"

res = minimal_db_initialization.query(AssignedResource).all()
for r in res:
print(r)

res = minimal_db_initialization.query(Resource).all()
for r in res:
print(r)
79 changes: 70 additions & 9 deletions tests/kao/test_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from oar.kao.slot import Slot, SlotSet
from oar.lib.globals import init_config
from oar.lib.job_handling import JobPseudo
from oar.lib.resource import MAX_NB_RESOURCES

config = init_config()

Expand Down Expand Up @@ -827,7 +826,7 @@ def test_schedule_error_2():


def test_schedule_envelope():
v = [(0, 100, ProcSet(*[(1, 32)]))]
v = [(0, 59, ProcSet(*[(2, 32)])), (60, 100, ProcSet(*[(1, 32)]))]

res = ProcSet(*[(1, 32)])
ss = SlotSet(Slot(1, 0, 0, res, 0, 100))
Expand All @@ -842,20 +841,24 @@ def test_schedule_envelope():
types={"envelope"},
deps=[],
key_cache={},
mld_res_rqts=[(1, 60, [([("resource_id", 0)], ProcSet(*res))])],
mld_res_rqts=[(1, 60, [([("resource_id", 1)], ProcSet(*res))])],
ts=False,
ph=0,
)

schedule_id_jobs_ct(all_ss, {1: j1}, hy, [1], 20)

# MAX_NB_RESOURCES-1 correspond to null resource (resource_id = 0 in database)
assert j1.res_set == ProcSet(MAX_NB_RESOURCES - 1)
assert j1.res_set == ProcSet((1, 1))
assert compare_slots_val_ref(ss, v) is True


def test_schedule_envelope_leaflet_1():
v = [(0, 19, ProcSet(*[(9, 32)])), (20, 100, ProcSet(*[(1, 32)]))]
v = [
(0, 19, ProcSet(*[(2, 8), (17, 32)])),
(20, 24, ProcSet(*[(2, 32)])),
(25, 100, ProcSet(*[(1, 32)])),
]

res = ProcSet(*[(1, 32)])
ss = SlotSet(Slot(1, 0, 0, res, 0, 100))
Expand All @@ -870,7 +873,7 @@ def test_schedule_envelope_leaflet_1():
types={"envelope"},
deps=[],
key_cache={},
mld_res_rqts=[(1, 25, [([("resource_id", 0)], ProcSet(*res))])],
mld_res_rqts=[(1, 25, [([("resource_id", 1)], ProcSet(*res))])],
ts=False,
ph=0,
)
Expand All @@ -887,9 +890,8 @@ def test_schedule_envelope_leaflet_1():

schedule_id_jobs_ct(all_ss, {1: j1, 2: j2}, hy, [1, 2], 5)

# MAX_NB_RESOURCES-1 correspond to null resource (resource_id = 0 in database)
assert j1.res_set == ProcSet(MAX_NB_RESOURCES - 1)
assert j2.res_set == ProcSet((1, 8))
assert j1.res_set == ProcSet(1)
assert j2.res_set == ProcSet((9, 16))
assert compare_slots_val_ref(ss, v) is True


Expand Down Expand Up @@ -937,3 +939,62 @@ def test_schedule_supersed_1():
assert j2.start_time == 1
assert j2.res_set == ProcSet((1, 16))
# assert compare_slots_val_ref(ss, v) is True


def test_schedule_leaflet_supersed():
# v = [(0, 19, ProcSet(*[(9, 32)])), (20, 100, ProcSet(*[(1, 32)]))]

res = ProcSet(*[(1, 32)])
ss = SlotSet(Slot(1, 0, 0, res, 0, 100))
all_ss = {"default": ss}
hy = {
"resource_id": [ProcSet(x) for x in range(1, 32 + 1)],
"node": [ProcSet(*x) for x in [[(1, 8)], [(9, 16)], [(17, 24)], [(25, 32)]]],
}

j1 = JobPseudo(
id=1,
start_time=1,
walltime=1000,
res_set=ProcSet(*[(0, 0)]),
types={"envelope"},
deps=[],
key_cache={},
ts=False,
ph=0,
)

j2 = JobPseudo(
id=2,
start_time=1,
walltime=10,
res_set=ProcSet(*[(4, 20)]),
types={"leaflet": "1"},
ts=False,
ph=0,
)

ss = SlotSet(Slot(1, 0, 0, ProcSet(*[(1, 32)]), 1, 100))
all_ss = {"default": ss}

set_slots_with_prev_scheduled_jobs(all_ss, [j1, j2], 10)

j3 = JobPseudo(
id=3,
types={"leaflet": "1", "supersed": "2"},
# types={ "supersed": "2"},
deps=[],
key_cache={},
mld_res_rqts=[(1, 20, [([("node", 4)], ProcSet(*res))])],
ts=False,
ph=0,
supersed=j2,
)
schedule_id_jobs_ct(all_ss, {3: j3}, hy, [3], 5)

# MAX_NB_RESOURCES-1 correspond to null resource (resource_id = 0 in database)
print(f"j3.start_time: {j3.start_time} j3.res_set: {j3.res_set}")
# import pdb; pdb.set_trace()
assert j3.start_time == 1
assert j3.res_set == ProcSet((1, 32))
# assert compare_slots_val_ref(ss, v) is True

0 comments on commit eb03978

Please sign in to comment.