Replies: 2 comments 7 replies
-
I have dug into this a bit more now. First, the error is clearly bubbling up from NNG, so maybe it would be better to ask @shikokuchuo about it. Second, I found that it often seems to be the case that this error occurs when SLURM starts multiple worker jobs at close to the same time. In my actual use case, job allocations would wait for several minutes, then 2-15 would start all within one second as resources became available. Always some of these would die in 3-8 seconds (including time to start R) with error 31, but it did sometimes happen that more than one per second would succeed. I made a test using 100 workers with very short, small worker job allocations which SLURM can fulfill ~immediately, and performance drops considerably; out of the 100 only two succeeded; the first was the one which was started to run In this case, 51 (slightly more than half) of the worker jobs failed with error library(targets)
tar_option_set(
packages = c("tibble"),
format = "qs",
deployment = "worker",
storage = "worker",
retrieval = "worker",
error = "abridge",
controller = crew.cluster::crew_controller_slurm(
name = "test_crew",
seconds_launch = 7200,
workers = 100,
tasks_max = 1000,
garbage_collection = TRUE,
launch_max = 3,
verbose = TRUE,
script_lines = readLines("slurm/puhti_crew.tmpl"),
slurm_log_output = "crew-%A.out",
slurm_log_error = NULL,
slurm_memory_gigabytes_per_cpu = 1,
slurm_cpus_per_task = 1,
slurm_time_minutes = 10,
slurm_partition = "small",
host = system2("ifdata", args = c("-pa", "ib0"), stdout = TRUE) # ip address of ib0
)
)
list(
tar_target(
name = data,
command = tibble(x = rnorm(100), y = rnorm(100))
),
tar_target(
name = test,
command = {
Sys.sleep(10)
dplyr::mutate(data, z = sqrt(x*x + y*y))
},
pattern = map(data)
)
) Conversely, running that test pipeline with 5 workers, all succeeded. The 100 worker version worked fine with |
Beta Was this translation helpful? Give feedback.
-
I implemented an alternate version of Another downside here is that it is no longer possible to run `%|||%` <- function(x, y) {
if (is.null(x)) {
y
} else {
x
}
}
crew_launcher_slurm2 <- function(
name = NULL,
seconds_interval = 0.5,
seconds_timeout = 60,
seconds_launch = 86400,
seconds_idle = Inf,
seconds_wall = Inf,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
tls = crew::crew_tls(mode = "automatic"),
verbose = FALSE,
command_submit = as.character(Sys.which("sbatch")),
command_terminate = as.character(Sys.which("scancel")),
command_delete = NULL,
script_directory = tempdir(),
script_lines = character(0L),
slurm_log_output = "/dev/null",
slurm_log_error = "/dev/null",
slurm_memory_gigabytes_per_cpu = NULL,
slurm_cpus_per_task = NULL,
slurm_time_minutes = 1440,
slurm_partition = NULL
) {
name <- as.character(name %|||% crew::crew_random_name())
if (!is.null(command_delete)) {
crew::crew_deprecate(
name = "command_delete",
date = "2023-01-08",
version = "0.1.4.9001",
alternative = "command_terminate"
)
command_terminate <- command_delete
}
launcher <- crew_class_launcher_slurm2$new(
name = name,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
tls = tls,
verbose = verbose,
command_submit = command_submit,
command_terminate = command_terminate,
script_directory = script_directory,
script_lines = script_lines,
slurm_log_output = slurm_log_output,
slurm_log_error = slurm_log_error,
slurm_memory_gigabytes_per_cpu = slurm_memory_gigabytes_per_cpu,
slurm_cpus_per_task = slurm_cpus_per_task,
slurm_time_minutes = slurm_time_minutes,
slurm_partition = slurm_partition
)
launcher$validate()
launcher
}
crew_class_launcher_slurm2 <- R6::R6Class(
classname = "crew_class_launcher_slurm2",
inherit = crew.cluster::crew_class_launcher_slurm,
private = list(
.prefix = NULL,
#### added ####
.last_job = NULL,
.args_launch = function(script) {
shQuote(
c(
"--parsable",
if (!is.null(private$.last_job)) paste0("--dependency=after:", private$.last_job) else NULL,
script
)
)
}
#### end added ####
),
public = list(
launch_worker = function(call, name, launcher, worker, instance) {
lines <- c(self$script(name = name), paste("R -e", shQuote(call)))
if (is.null(private$.prefix)) {
if (!file.exists(super$script_directory)) {
dir.create(super$script_directory, recursive = TRUE)
}
private$.prefix <- crew::crew_random_name()
}
script <- crew.cluster:::path_script(
dir = super$script_directory,
prefix = private$.prefix,
launcher = launcher,
worker = worker
)
writeLines(text = lines, con = script)
#### modified ####
launch_result <- system2(
command = super$command_submit,
args = private$.args_launch(script = script),
stdout = TRUE,
stderr = crew.cluster:::if_any(super$verbose, "", FALSE),
wait = TRUE
)
if (is.null(attr(launch_result, "status"))) {
# `sbatch --parsable` returns JobID, optionally followed by semicolon and cluster name.
# add more sanity checks here
job <- strsplit(launch_result, ";")[[1]][1]
private$.last_job <- job
}
if (any(super$verbose)) cat(launch_result, "\n")
#### end modified ####
list(name = name, script = script)
}
)
)
crew_controller_slurm2 <- function(
name = NULL,
workers = 1L,
host = NULL,
port = NULL,
tls = crew::crew_tls(mode = "automatic"),
tls_enable = NULL,
tls_config = NULL,
seconds_interval = 0.25,
seconds_timeout = 60,
seconds_launch = 86400,
seconds_idle = Inf,
seconds_wall = Inf,
seconds_exit = NULL,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
verbose = FALSE,
command_submit = as.character(Sys.which("sbatch")),
command_terminate = as.character(Sys.which("scancel")),
command_delete = NULL,
script_directory = tempdir(),
script_lines = character(0L),
slurm_log_output = "/dev/null",
slurm_log_error = "/dev/null",
slurm_memory_gigabytes_per_cpu = NULL,
slurm_cpus_per_task = NULL,
slurm_time_minutes = 1440,
slurm_partition = NULL
) {
if (!is.null(seconds_exit)) {
crew::crew_deprecate(
name = "seconds_exit",
date = "2023-09-21",
version = "0.5.0.9002",
alternative = "none (no longer necessary)"
)
}
client <- crew::crew_client(
name = name,
workers = workers,
host = host,
port = port,
tls = tls,
tls_enable = tls_enable,
tls_config = tls_config,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout
)
launcher <- crew_launcher_slurm2(
name = name,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
tls = tls,
verbose = verbose,
command_submit = command_submit,
command_terminate = command_terminate,
command_delete = command_delete,
script_directory = script_directory,
script_lines = script_lines,
slurm_log_output = slurm_log_output,
slurm_log_error = slurm_log_error,
slurm_memory_gigabytes_per_cpu = slurm_memory_gigabytes_per_cpu,
slurm_cpus_per_task = slurm_cpus_per_task,
slurm_time_minutes = slurm_time_minutes,
slurm_partition = slurm_partition
)
controller <- crew::crew_controller(client = client, launcher = launcher)
controller$validate()
controller
} |
Beta Was this translation helpful? Give feedback.
-
I am running a
targets
pipeline usingcrew.cluster
on SLURM. I notice that fewer workers are actually running than have been requested. Looking at the logs of the "missing" workers, I find:As far as I can tell, this error is occurring when they first dial in to the controller, so the controller is unaware that they have ever launched. I am using
seconds_launch = 7200
because the queue time on my cluster can be quite long, so it takes a long time for the controller to give up and relaunch these workers.Any idea what could be causing this error?
Beta Was this translation helpful? Give feedback.
All reactions