diff --git a/doc/man5/flux-config-accounting.rst b/doc/man5/flux-config-accounting.rst index 7252fcd5..1f10e4b3 100644 --- a/doc/man5/flux-config-accounting.rst +++ b/doc/man5/flux-config-accounting.rst @@ -6,6 +6,9 @@ flux-config-accounting(5) DESCRIPTION =========== +accounting.factor-weights +------------------------- + The flux-accounting priority plugin can be configured to assign different weights to the different factors used when calculating a job's priority. Assigning a higher weight to a factor will result in it having more @@ -24,7 +27,7 @@ The ``accounting.factor-weights`` sub-table may contain the following keys: KEYS -==== +^^^^ fairshare Integer value that represents the weight to be associated with an @@ -36,10 +39,36 @@ queue EXAMPLE -======= +^^^^^^^ :: [accounting.factor-weights] fairshare = 10000 queue = 1000 + +accounting.queue-priorities +--------------------------- + +The priority plugin's queues can also be configured to have a different +associated integer priority in the TOML file. Queues can positively or +negatively affect a job's calculated priority depending on the priority +assigned to each one. By default, queues have an associated priority of 0, +meaning they do not affect a job's priority at all. + +If a queue defined in the config file is unknown to the priority plugin, it +will add the queue to its internal map. Otherwise, it will update the queue's +priority with the new value. + +The ``accounting.queue-priorities`` sub-table should list any configured queue +as the key and its associated integer priority as the value. + +EXAMPLE +^^^^^^^ + +:: + + [accounting.queue-priorities] + bronze=100 + silver=200 + gold=300 diff --git a/src/plugins/mf_priority.cpp b/src/plugins/mf_priority.cpp index c8123fea..03bc2dd8 100644 --- a/src/plugins/mf_priority.cpp +++ b/src/plugins/mf_priority.cpp @@ -244,7 +244,8 @@ static int decrement_resources (Association *b, json_t *jobspec) /* * Get config information about the various priority factor weights - * and assign them in the priority_weights map. + * and assign them in the priority_weights map. Update the queues map with + * any defined integer priorities. */ static int conf_update_cb (flux_plugin_t *p, const char *topic, @@ -252,15 +253,17 @@ static int conf_update_cb (flux_plugin_t *p, void *data) { int fshare_weight = -1, queue_weight = -1; + json_t *queue_priorities = nullptr; flux_t *h = flux_jobtap_get_flux (p); // unpack the various factors to be used in job priority calculation if (flux_plugin_arg_unpack (args, FLUX_PLUGIN_ARG_IN, - "{s?{s?{s?{s?i, s?i}}}}", + "{s?{s?{s?{s?i, s?i}, s?o}}}", "conf", "accounting", "factor-weights", "fairshare", &fshare_weight, - "queue", &queue_weight) < 0) { + "queue", &queue_weight, + "queue-priorities", &queue_priorities) < 0) { flux_log_error (flux_jobtap_get_flux (p), "mf_priority: conf.update: flux_plugin_arg_unpack: %s", flux_plugin_arg_strerror (args)); @@ -273,6 +276,32 @@ static int conf_update_cb (flux_plugin_t *p, if (queue_weight != -1) priority_weights["queue"] = queue_weight; + if (queue_priorities) { + const char *key; + json_t *value; + + json_object_foreach (queue_priorities, key, value) { + if (!key || !json_is_integer (value)) { + flux_log_error (h, "mf_priority: invalid data in queue-priorities"); + continue; + } + + const std::string queue_name (key); + int priority = json_integer_value (value); + + auto it = queues.find (queue_name); + if (it != queues.end ()) { + // update the priority for the existing queue + it->second.priority = priority; + } else { + // queue does not exist; create a new queue + Queue new_queue; + new_queue.priority = priority; + queues[queue_name] = new_queue; + } + } + } + return 0; } diff --git a/t/Makefile.am b/t/Makefile.am index aaf32839..4f451c17 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -48,6 +48,7 @@ TESTSCRIPTS = \ t1046-issue565.t \ t1047-issue564.t \ t1048-issue575.t \ + t1049-mf-priority-queue-config.t \ t5000-valgrind.t \ python/t1000-example.py \ python/t1001_db.py \ diff --git a/t/t1049-mf-priority-queue-config.t b/t/t1049-mf-priority-queue-config.t new file mode 100755 index 00000000..90348c17 --- /dev/null +++ b/t/t1049-mf-priority-queue-config.t @@ -0,0 +1,119 @@ +#!/bin/bash + +test_description='test configuring priorities of queues in priority plugin' + +. `dirname $0`/sharness.sh + +mkdir -p conf.d + +MULTI_FACTOR_PRIORITY=${FLUX_BUILD_DIR}/src/plugins/.libs/mf_priority.so +SUBMIT_AS=${SHARNESS_TEST_SRCDIR}/scripts/submit_as.py +SEND_PAYLOAD=${SHARNESS_TEST_SRCDIR}/scripts/send_payload.py +DB_PATH=$(pwd)/FluxAccountingTest.db + +export TEST_UNDER_FLUX_NO_JOB_EXEC=y +export TEST_UNDER_FLUX_SCHED_SIMPLE_MODE="limited=1" +test_under_flux 1 job -o,--config-path=$(pwd)/conf.d + +flux setattr log-stderr-level 1 + +test_expect_success 'allow guest access to testexec' ' + flux config load <<-EOF + [exec.testexec] + allow-guests = true + EOF +' + +test_expect_success 'load priority plugin' ' + flux jobtap load ${MULTI_FACTOR_PRIORITY} +' + +test_expect_success 'create a flux-accounting DB' ' + flux account -p ${DB_PATH} create-db +' + +test_expect_success 'start flux-accounting service' ' + flux account-service -p ${DB_PATH} -t +' + +test_expect_success 'add some banks to the DB' ' + flux account add-bank root 1 && + flux account add-bank --parent-bank=root bankA 1 +' + +test_expect_success 'add some queues to the DB with configured priorities' ' + flux account add-queue bronze --priority=100 && + flux account add-queue silver --priority=500 && + flux account add-queue gold --priority=1000 +' + +test_expect_success 'add a user to the DB' ' + flux account add-user \ + --username=user5001 \ + --userid=5001 \ + --bank=bankA \ + --queues=bronze,silver +' + +test_expect_success 'send flux-accounting DB information to the plugin' ' + flux account-priority-update -p ${DB_PATH} +' + +test_expect_success 'stop the queue' ' + flux queue stop +' + +test_expect_success 'configure flux with queues' ' + cat >conf.d/queues.toml <<-EOT && + [queues.bronze] + [queues.silver] + [queues.gold] + EOT + flux config reload && + flux queue stop --all +' + +test_expect_success 'submit a job to bronze queue' ' + job=$(flux python ${SUBMIT_AS} 5001 -n1 --queue=bronze hostname) && + flux job wait-event -f json ${job} priority | jq '.context.priority' > job.priority && + test $(cat job.priority) -eq 1050000 && + flux cancel ${job} +' + +test_expect_success 'decrease priority for the bronze queue in config' ' + cat >conf.d/test.toml <<-EOT && + [accounting.queue-priorities] + bronze = 0 + EOT + flux config reload && + flux queue stop --all +' + +test_expect_success 'submit another job to bronze queue; priority is negatively affected' ' + job=$(flux python ${SUBMIT_AS} 5001 -n1 --queue=bronze hostname) && + flux job wait-event -f json ${job} priority | jq '.context.priority' > job.priority && + test $(cat job.priority) -eq 50000 && + flux cancel ${job} +' + +test_expect_success 'increase priority for the bronze queue in config' ' + cat >conf.d/test.toml <<-EOT && + [accounting.queue-priorities] + bronze = 123 + EOT + flux config reload && + flux queue stop --all +' + +test_expect_success 'submit another job to bronze queue; priority is positively affected' ' + job=$(flux python ${SUBMIT_AS} 5001 -n1 --queue=bronze hostname) && + flux job wait-event -f json ${job} priority | jq '.context.priority' > job.priority && + test $(cat job.priority) -eq 1280000 && + flux cancel ${job} +' + +test_expect_success 'shut down flux-accounting service' ' + flux python -c "import flux; flux.Flux().rpc(\"accounting.shutdown_service\").get()" +' + +test_done