Skip to content

Commit 40e3f6e

Browse files
authored
Merge pull request #3464 from grondo/jobtap
jobtap: prototype job-manager plugin support
2 parents 669c9cc + 06bb235 commit 40e3f6e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2568
-196
lines changed

doc/Makefile.am

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ MAN3_FILES_PRIMARY = \
109109
man3/flux_kvs_copy.3 \
110110
man3/flux_core_version.3 \
111111
man3/idset_create.3 \
112-
man3/idset_encode.3
112+
man3/idset_encode.3 \
113+
man3/flux_jobtap_get_flux.3
113114

114115
# These files are generated as clones of a primary page.
115116
# Sphinx handles this automatically if declared in the conf.py
@@ -253,7 +254,11 @@ MAN3_FILES_SECONDARY = \
253254
man3/idset_first.3 \
254255
man3/idset_next.3 \
255256
man3/idset_count.3 \
256-
man3/idset_equal.3
257+
man3/idset_equal.3 \
258+
man3/flux_jobtap_service_register.3 \
259+
man3/flux_jobtap_reprioritize_all.3 \
260+
man3/flux_jobtap_reprioritize_job.3 \
261+
man3/flux_jobtap_priority_unavail.3
257262

258263
MAN5_FILES = $(MAN5_FILES_PRIMARY)
259264

@@ -263,7 +268,8 @@ MAN5_FILES_PRIMARY = \
263268
MAN7_FILES = $(MAN7_FILES_PRIMARY)
264269

265270
MAN7_FILES_PRIMARY = \
266-
man7/flux-broker-attributes.7
271+
man7/flux-broker-attributes.7 \
272+
man7/flux-jobtap-plugins.7
267273

268274

269275
RST_FILES = \

doc/conf.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,14 @@ def setup(app):
344344
('man3/idset_encode','idset_encode', 'Convert idset to string and string to idset', [author], 3),
345345
('man3/idset_encode','idset_decode', 'Convert idset to string and string to idset', [author], 3),
346346
('man3/idset_encode','idset_ndecode', 'Convert idset to string and string to idset', [author], 3),
347+
('man3/flux_jobtap_get_flux','flux_jobtap_get_flux', 'Flux jobtap plugin interfaces', [author], 3),
348+
('man3/flux_jobtap_get_flux','flux_jobtap_service_register', 'Flux jobtap plugin interfaces', [author], 3),
349+
('man3/flux_jobtap_get_flux','flux_jobtap_reprioritize_all', 'Flux jobtap plugin interfaces', [author], 3),
350+
('man3/flux_jobtap_get_flux','flux_jobtap_reprioritize_job', 'Flux jobtap plugin interfaces', [author], 3),
351+
('man3/flux_jobtap_get_flux','flux_jobtap_priority_unavail', 'Flux jobtap plugin interfaces', [author], 3),
347352
('man5/flux-config-bootstrap', 'flux-config-bootstrap', 'configure Flux instance bootstrap', [author], 5),
348353
('man7/flux-broker-attributes', 'flux-broker-attributes', 'overview Flux broker attributes', [author], 7),
354+
('man7/flux-jobtap-plugins', 'flux-jobtap-plugins', 'overview Flux jobtap plugin API', [author], 7),
349355
]
350356

351357
# -- Options for Intersphinx -------------------------------------------------

doc/man3/flux_jobtap_get_flux.rst

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
=======================
2+
flux_jobtap_get_flux(3)
3+
=======================
4+
5+
6+
SYNOPSIS
7+
========
8+
9+
::
10+
11+
#include <flux/core.h>
12+
#include <flux/jobtap.h>
13+
14+
::
15+
16+
flux_t *flux_jobtap_get_flux (flux_plugin_t *p);
17+
18+
::
19+
20+
int flux_jobtap_service_register (flux_plugin_t *p,
21+
const char *method,
22+
flux_msg_handler_f cb,
23+
void *arg);
24+
25+
::
26+
27+
int flux_jobtap_reprioritize_all (flux_plugin_t *p);
28+
29+
::
30+
31+
int flux_jobtap_reprioritize_job (flux_plugin_t *p,
32+
flux_jobid_t id,
33+
unsigned int priority);
34+
35+
::
36+
37+
int flux_jobtap_priority_unavail (flux_plugin_t *p,
38+
flux_plugin_arg_t *args);
39+
40+
41+
DESCRIPTION
42+
===========
43+
44+
These interfaces are used by Flux *jobtap* plugins which are used to
45+
extend the job manager broker module.
46+
47+
``flux_jobtap_get_flux()`` returns the job manager's Flux handle given
48+
the plugin's ``flux_plugin_t *``. This can be used by a *jobtap* plugin
49+
to send RPCs, schedule timer watchers, or other asynchronous work.
50+
51+
``flux_jobtap_service_register()`` registers a service name ``method``
52+
under the job manager which will be handled by the provided message
53+
handler ``cb``. The constructed service name will be
54+
``job-manager.<name>.<method>`` where ``name`` is the name of the plugin
55+
as returned by ``flux_plugin_get_name(3)``. As such, this call may
56+
fail if the *jobtap* plugin has not yet set a name for itself using
57+
``flux_plugin_set_name(3)``.
58+
59+
``flux_jobtap_reprioritize_all()`` requests that the job manager begin
60+
reprioritization of all pending jobs, i.e. jobs in the PRIORITY and
61+
SCHED states. This will result on each job having a ``job.priority.get``
62+
callback invoked on it.
63+
64+
``flux_jobtap_reprioritize_job()`` allows a *jobtap* plugin to asynchronously
65+
assign the priority of a job.
66+
67+
``flux_jobtap_priority_unavail()`` is a convenience function which may
68+
be used by a plugin in the ``job.state.priority`` priority callback to
69+
indicate that a priority for the job is not yet available. It can be
70+
called as::
71+
72+
return flux_jobtap_priority_unavail (p, args);
73+
74+
75+
RETURN VALUE
76+
============
77+
78+
``flux_jobtap_get_flux()`` returns a ``flux_t *`` handle on success. ``NULL``
79+
is returned with errno set to ``EINVAL`` if the supplied ``flux_plugin_t *p``
80+
is not a jobtap plugin handle.
81+
82+
The remaining functions return 0 on success, -1 on failure.
83+
84+
RESOURCES
85+
=========
86+
87+
Github: http://github.com/flux-framework
88+
89+
90+
SEE ALSO
91+
========
92+
93+
flux-jobtap-plugins(7)

doc/man3/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ man3
7272
flux_zmq_watcher_create
7373
idset_create
7474
idset_encode
75+
flux_jobtap_get_flux
7576

7677
.. toctree::
7778
:hidden:

doc/man7/flux-jobtap-plugins.rst

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
======================
2+
flux-jobtap-plugins(7)
3+
======================
4+
5+
6+
DESCRIPTION
7+
===========
8+
9+
The *jobtap* interface supports loading of builtin and external
10+
plugins into the job manager broker module. These plugins can be used
11+
to assign job priorities using algorithms other than the default,
12+
aid in debugging of the flow of job states, or generically extend
13+
the functionality of the job manager.
14+
15+
NOTE:
16+
Currently only a single jobtap plugin may be loaded into the job manager
17+
at a time.
18+
19+
Jobtap plugins are defined using the Flux standard plugin format. Therefore
20+
a jobtap plugin should export the single symbol: ``flux_plugin_init()``,
21+
from which calls to ``flux_plugin_add_handler(3)`` should be used to
22+
register functions which will be called for the callback topic strings
23+
described in the :ref:`callback_topics` section below.
24+
25+
Each callback function uses the Flux standard plugin callback form, e.g.::
26+
27+
int callback (flux_plugin_t *p,
28+
const char *topic,
29+
flux_plugin_arg_t *args,
30+
void *arg);
31+
32+
where ``p`` is the handle for the current *jobtap* plugin, ``topic`` is
33+
the *topic string* for the currently invoked callback, ``args`` contains
34+
a set of plugin arguments which may be unpacked with the
35+
``flux_plugin_arg_unpack(3)`` call, and ``arg`` is any opaque argument
36+
passed along when registering the handler.
37+
38+
JOBTAP PLUGIN ARGUMENTS
39+
=======================
40+
41+
For job-specific callbacks, all job data is passed to the plugin via
42+
the ``flux_plugin_arg_t *args``, and return data is sent back to the
43+
job manager via the same ``args``. Incoming arguments may be unpacked
44+
using ``flux_plugin_arg_unpack(3)``, e.g.::
45+
46+
rc = flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN,
47+
"{s{s:o}, s:I}",
48+
"jobspec", "resources", &resources,
49+
"id", &id);
50+
51+
will unpack the ``resources`` section of jobspec and the jobid into
52+
``resources`` and ``id`` respectively.
53+
54+
The full list of available args includes the following:
55+
56+
========== ==== ==========================================
57+
name type description
58+
========== ==== ==========================================
59+
jobspec o jobspec with environment redacted
60+
id I jobid
61+
state i current job state
62+
prev_state i previous state (``job.state.*`` callbacks)
63+
userid i userid
64+
urgency i current urgency
65+
priority I current priority
66+
t_submit f submit timestamp in floating point seconds
67+
entry o posted eventlog entry, including context
68+
========== ==== ==========================================
69+
70+
Return arguments can be packed using the ``FLUX_PLUGIN_ARG_OUT`` and
71+
optionally ``FLUX_PLUGIN_ARG_UPDATE`` flags. For example to return
72+
a priority::
73+
74+
rc = flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT,
75+
"{s:I}",
76+
"priority", (int64_t) priority);
77+
78+
While a job is pending, *jobtap* plugin callbacks may also add job
79+
annotations by returning a value for the ``annotations`` key::
80+
81+
flux_plugin_arg_pack (args, FLUX_PLUGIN_ARG_OUT|FLUX_PLUGIN_ARG_UPDATE,
82+
"{s:{s:s}}",
83+
"annotations", "test", value);
84+
85+
.. _callback_topics:
86+
87+
CALLBACK TOPICS
88+
===============
89+
90+
The following callback "topic strings" are currently provided by the
91+
*jobtap* interface:
92+
93+
job.new
94+
The ``job.new`` topic is used by the job manager to notify a jobtap plugin
95+
about a newly introduced job. This call may be made in three different
96+
situations:
97+
98+
1. on job job submission
99+
2. when the job manager is restarted and has reloaded a job from the KVS
100+
3. when a new jobtap plugin is loaded
101+
102+
In case 1 above, the job state will always be ``FLUX_JOB_STATE_NEW``, while
103+
jobs in cases 2 and 3 can be in any state except ``FLUX_JOB_STATE_INACTIVE``.
104+
105+
job.state.*
106+
The ``job.state.*`` callbacks are made just after a job state transition.
107+
The callback is made after the state has been published to the job's
108+
eventlog, but before any action has been taken on that state (since the
109+
action could involve immediately transitioning to a new state)
110+
111+
job.state.priority
112+
The callback for ``FLUX_JOB_STATE_PRIORITY`` is special, in that a plugin
113+
must return a priority at the end of the callback (if the plugin is
114+
a priority-managing plugin). If no priority is returned from this callback,
115+
then the job manager assumes the plugin does not set job priorities,
116+
and will take default action. If the job priority is not available, the
117+
plugin should instead use ``flux_jobtap_priority_unavail()`` to indicate
118+
that the priority cannot be set. Jobs that do not have a priority will
119+
remain in the PRIORITY state until a priority is assigned, so a plugin
120+
should arrange for the priority to be set asynchronously using
121+
``flux_jobtap_reprioritize_job()``).
122+
123+
job.priority.get
124+
The job manager calls the ``job.priority.get`` topic whenever it wants
125+
to update the job priority of a single job. The plugin should return a
126+
priority immediately, but if one is not available when a job is in
127+
the PRIORITY state, the plugin may use ``flux_jobtap_priority_unavail()``
128+
to indicate the priority is not available. Returning an unavailable
129+
priority in the SCHED state is an error and it will be logged, but
130+
otherwise ignored.
131+
132+
RESOURCES
133+
=========
134+
135+
Github: http://github.com/flux-framework
136+
137+
138+
SEE ALSO
139+
========
140+
141+

doc/man7/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ man7
66
:maxdepth: 1
77

88
flux-broker-attributes
9+
flux-jobtap-plugins

doc/test/spell.en.pws

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,3 +527,8 @@ dirname
527527
jps
528528
xargs
529529
sep
530+
RPCs
531+
jobtap
532+
unavail
533+
reprioritize
534+
reprioritization

src/Makefile.am

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ noinst_HEADERS = \
1212
include/flux/idset.h \
1313
include/flux/schedutil.h \
1414
include/flux/shell.h \
15-
include/flux/hostlist.h
15+
include/flux/hostlist.h \
16+
include/flux/jobtap.h

src/bindings/python/flux/job/Jobspec.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import yaml
2121

2222
from _flux._core import ffi
23-
from flux.util import parse_fsd
23+
from flux.util import parse_fsd, set_treedict
2424

2525

2626
def _convert_jobspec_arg_to_string(jobspec):
@@ -392,22 +392,12 @@ def _set_io_path(self, iotype, stream_name, path):
392392
self.setattr_shell_option("{}.{}.type".format(iotype, stream_name), "file")
393393
self.setattr_shell_option("{}.{}.path".format(iotype, stream_name), path)
394394

395-
def _set_treedict(self, in_dict, key, val):
396-
"""
397-
_set_treedict(d, "a.b.c", 42) is like d[a][b][c] = 42
398-
but levels are created on demand.
399-
"""
400-
path = key.split(".", 1)
401-
if len(path) == 2:
402-
self._set_treedict(in_dict.setdefault(path[0], {}), path[1], val)
403-
else:
404-
in_dict[key] = val
405-
406395
def setattr(self, key, val):
396+
407397
"""
408398
set job attribute
409399
"""
410-
self._set_treedict(self.jobspec, "attributes." + key, val)
400+
set_treedict(self.jobspec, "attributes." + key, val)
411401

412402
def setattr_shell_option(self, key, val):
413403
"""

src/bindings/python/flux/job/info.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class JobInfo:
106106
"t_inactive": 0.0,
107107
"expiration": 0.0,
108108
"nnodes": "",
109+
"priority": "",
109110
"ranks": "",
110111
"nodelist": "",
111112
"success": "",

src/bindings/python/flux/util.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,44 @@ def _format_action_invocation(self, action):
147147
return lambda prog: FluxHelpFormatter(prog, max_help_position=argwidth)
148148

149149

150+
def set_treedict(in_dict, key, val):
151+
"""
152+
set_treedict(d, "a.b.c", 42) is like d[a][b][c] = 42
153+
but levels are created on demand.
154+
"""
155+
path = key.split(".", 1)
156+
if len(path) == 2:
157+
set_treedict(in_dict.setdefault(path[0], {}), path[1], val)
158+
else:
159+
in_dict[key] = val
160+
161+
162+
class TreedictAction(argparse.Action):
163+
"""
164+
argparse action that returns a dictionary from a set of key=value option
165+
arguments. The ``flux.util.set_treedict()`` function is used to set
166+
``key`` in the dictionary, which allows ``key`` to have embedded dots.
167+
e.g. ``a.b.c=42`` is like::
168+
169+
dest[a][b][c] = 42
170+
171+
but levels are created on demand.
172+
"""
173+
174+
def __call__(self, parser, namespace, values, option_string=None):
175+
result = {}
176+
for arg in values:
177+
tmp = arg.split("=", 1)
178+
if len(tmp) != 2:
179+
raise ValueError(f"Missing required value for key {arg}")
180+
try:
181+
val = json.loads(tmp[1])
182+
except (json.JSONDecodeError, TypeError):
183+
val = tmp[1]
184+
set_treedict(result, tmp[0], val)
185+
setattr(namespace, self.dest, result)
186+
187+
150188
class CLIMain(object):
151189
def __init__(self, logger=None):
152190
if logger is None:

0 commit comments

Comments
 (0)