Skip to content

Commit

Permalink
Improve support for spot scale-sets
Browse files Browse the repository at this point in the history
Add an interactive thread and a loop on that thread that checks for spot eviction.
  • Loading branch information
samtkaplan committed Sep 8, 2023
1 parent 75514e0 commit f753629
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 30 deletions.
124 changes: 95 additions & 29 deletions src/AzManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ function azrequest(rtype, verbose, url, headers, body=nothing)
end

options = (retry=false, status_exception=false)
if body == nothing
if body === nothing
r = HTTP.request(rtype, url, headers; verbose=verbose, options...)
else
r = HTTP.request(rtype, url, headers, body; verbose=verbose, options...)
Expand Down Expand Up @@ -514,7 +514,16 @@ function process_pending_connections()
end

let socket = _socket
@async addprocs(manager; socket)
@async begin
pids = addprocs(manager; socket)
for pid in pids
wrkr = Distributed.map_pid_wrkr[pid]
if isdefined(wrkr, :config) && isdefined(wrkr.config, :userdata) && lowercase(get(wrkr.config.userdata, "priority", "")) == "spot"
remotecall(machine_prempt_loop, pid) # monitor for Azure spot evictions on each machine
end
end
pids
end
end
end
end
Expand Down Expand Up @@ -558,6 +567,14 @@ function spinner(n_target_workers)
nothing
end

function nthreads_filter(nthreads)
_nthreads = split(string(nthreads), ',')
nthreads_default = length(_nthreads) > 0 ? parse(Int, _nthreads[1]) : 1
nthreads_interactive = length(_nthreads) > 1 ? parse(Int, _nthreads[2]) : 0

nthreads_interactive > 0 ? string("$nthreads_default,$nthreads_interactive") : string(nthreads_default)
end

"""
addprocs(template, ninstances[; kwargs...])
Expand All @@ -576,7 +593,7 @@ method or a string corresponding to a template stored in `~/.azmanagers/template
* `group="cbox"` The name of the Azure scale set. If the scale set does not yet exist, it will be created.
* `overprovision=true` Use Azure scle-set overprovisioning?
* `ppi=1` The number of Julia processes to start per Azure scale set instance.
* `julia_num_threads=Threads.nthreads()` set the number of Julia threads to run on each worker
* `julia_num_threads="\$(Threads.nthreads(),\$(Threads.nthreads(:interactive))"` set the number of julia threads for the detached process.[2]
* `omp_num_threads=get(ENV, "OMP_NUM_THREADS", 1)` set the number of OpenMP threads to run on each worker
* `env=Dict()` each dictionary entry is an environment variable set on the worker before Julia starts. e.g. `env=Dict("OMP_PROC_BIND"=>"close")`
* `nretry=20` Number of retries for HTTP REST calls to Azure services.
Expand All @@ -587,22 +604,23 @@ method or a string corresponding to a template stored in `~/.azmanagers/template
* `maxprice=-1` set maximum price per hour for a VM in the scale-set. `-1` uses the market price.
* `spot_base_regular_priority_count=0` If spot is true, only start adding spot machines once there are this many non-spot machines added.
* `spot_regular_percentage_above_base` If spot is true, then when ading new machines (above `spot_base_reqular_priority_count`) use regular (non-spot) priority for this percent of new machines.
* `waitfor=false` wait for the cluster to be provisioned before returning, or return control to the caller immediately[2]
* `mpi_ranks_per_worker=0` set the number of MPI ranks per Julia worker[3]
* `waitfor=false` wait for the cluster to be provisioned before returning, or return control to the caller immediately[3]
* `mpi_ranks_per_worker=0` set the number of MPI ranks per Julia worker[4]
* `mpi_flags="-bind-to core:\$(ENV["OMP_NUM_THREADS"]) -map-by numa"` extra flags to pass to mpirun (has effect when `mpi_ranks_per_worker>0`)
* `nvidia_enable_ecc=true` on NVIDIA machines, ensure that ECC is set to `true` or `false` for all GPUs[4]
* `nvidia_enable_mig=false` on NVIDIA machines, ensure that MIG is set to `true` or `false` for all GPUs[4]
* `nvidia_enable_ecc=true` on NVIDIA machines, ensure that ECC is set to `true` or `false` for all GPUs[5]
* `nvidia_enable_mig=false` on NVIDIA machines, ensure that MIG is set to `true` or `false` for all GPUs[5]
* `hyperthreading=nothing` Turn on/off hyperthreading on supported machine sizes. The default uses the setting in the template. To override the template setting, use `true` (on) or `false` (off).
# Notes
[1] If `addprocs` is called from an Azure VM, then the default `imagename`,`imageversion` are the
image/version the VM was built with; otherwise, it is the latest version of the image specified in the scale-set template.
[2] `waitfor=false` reflects the fact that the cluster manager is dynamic. After the call to `addprocs` returns, use `workers()`
[2] Interactive threads are supported beginning in version 1.9 of Julia. For earlier versions, the default for `julia_num_threads` is `Threads.nthreads()`.
[3] `waitfor=false` reflects the fact that the cluster manager is dynamic. After the call to `addprocs` returns, use `workers()`
to monitor the size of the cluster.
[3] This is inteneded for use with Devito. In particular, it allows Devito to gain performance by using
[4] This is inteneded for use with Devito. In particular, it allows Devito to gain performance by using
MPI to do domain decomposition using MPI within a single VM. If `mpi_ranks_per_worker=0`, then MPI is not
used on the Julia workers.
[4] This may result in a re-boot of the VMs
[5] This may result in a re-boot of the VMs
"""
function Distributed.addprocs(template::Dict, n::Int;
subscriptionid = "",
Expand All @@ -616,7 +634,7 @@ function Distributed.addprocs(template::Dict, n::Int;
group = "cbox",
overprovision = true,
ppi = 1,
julia_num_threads = Threads.nthreads(),
julia_num_threads = VERSION >= v"1.9" ? "$(Threads.nthreads()),$(Threads.nthreads(:interactive))" : string(Threads.nthreads()),
omp_num_threads = parse(Int, get(ENV, "OMP_NUM_THREADS", "1")),
env = Dict(),
nretry = 20,
Expand Down Expand Up @@ -652,6 +670,8 @@ function Distributed.addprocs(template::Dict, n::Int;

osdisksize = max(osdisksize, image_osdisksize(manager, template["value"], sigimagename, sigimageversion, imagename))

julia_num_threads = nthreads_filter(julia_num_threads)

@info "Provisioning $n virtual machines in scale-set $group..."
_scalesets[scaleset] = scaleset_create_or_update(manager, user, subscriptionid, resourcegroup, group, sigimagename, sigimageversion, imagename, osdisksize,
nretry, template, n, ppi, mpi_ranks_per_worker, mpi_flags, nvidia_enable_ecc, nvidia_enable_mig, hyperthreading, julia_num_threads, omp_num_threads,
Expand Down Expand Up @@ -883,18 +903,50 @@ Check to see if the machine `id::Int` has received an Azure spot preempt message
true if a preempt message is received and false otherwise.
"""
function preempted()
_r = HTTP.request("GET", "http://169.254.169.254/metadata/scheduledevents?api-version=2019-08-01", ["Metadata"=>"true"])
_r = HTTP.request("GET", "http://169.254.169.254/metadata/instance/compute?api-version=2021-02-01", ["Metadata"=>"true"]; redirect=false)
r = JSON.parse(String(_r.body))
instanceid = get(r, "name", "")
_r = HTTP.request("GET", "http://169.254.169.254/metadata/scheduledevents?api-version=2020-07-01", ["Metadata"=>"true"]; redirect=false)
r = JSON.parse(String(_r.body))
for event in r["Events"]
if get(event, "EventType", "") == "Preempt"
@info "event=$event"
if get(event, "EventType", "") == "Preempt" && instanceid get(event, "Resources", [])
@warn "Machine with id $(myid()) is being pre-empted" now(Dates.UTC) event["NotBefore"] event["EventType"] event["EventSource"]
return true
end
end
return false
end
preempted(id) = remotecall_fetch(preempted, id)

macro spawn_interactive(ex::Expr)
if VERSION > v"1.9"
:(Threads.@spawn :interactive esc($ex))
else
:(Threads.@spawn esc($ex))
end
end

function machine_prempt_loop()
if VERSION >= v"1.9" && Threads.nthreads(:interactive) > 0
tsk = @spawn_interactive while true
if preempted()
# self-destruct button, Distributed should see that the process is exited and update the cluster book-keeping.
exit()
break
end
sleep(1)
end
try
wait(tsk)
catch e
@info "preempt loop failed"
logerror(e, Logging.Warn)
end
else
@warn "AzManagers is not running the preempt loop for pid=$(myid()) since it requires at least one interactive thread on worker machines."
end
end

function azure_physical_name(keyval="PhysicalHostName")
local physical_hostname
try
Expand All @@ -914,7 +966,7 @@ function azure_worker_init(cookie, master_address, master_port, ppi, mpi_size)
nbytes_written == Distributed.HDR_COOKIE_LEN || error("unable to write bytes")
flush(c)

_r = HTTP.request("GET", "http://169.254.169.254/metadata/instance?api-version=2020-06-01", ["Metadata"=>"true"]; redirect=false)
_r = HTTP.request("GET", "http://169.254.169.254/metadata/instance?api-version=2021-02-01", ["Metadata"=>"true"]; redirect=false)
r = JSON.parse(String(_r.body))
vm = Dict(
"bind_addr" => string(getipaddr(IPv4)),
Expand All @@ -924,6 +976,7 @@ function azure_worker_init(cookie, master_address, master_port, ppi, mpi_size)
"resourcegroup" => lowercase(r["compute"]["resourceGroupName"]),
"scalesetname" => lowercase(r["compute"]["vmScaleSetName"]),
"instanceid" => split(r["compute"]["resourceId"], '/')[end],
"priority" => get(r["compute"], "priority", ""),
"localid" => 1,
"name" => r["compute"]["name"],
"mpi" => mpi_size > 0,
Expand Down Expand Up @@ -1328,7 +1381,7 @@ function scaleset_image(manager::AzManager, sigimagename, sigimageversion, image

# get machines' metadata
t = @async begin
r = HTTP.request("GET", "http://169.254.169.254/metadata/instance/compute/storageProfile/imageReference?api-version=2019-06-01", ["Metadata"=>"true"]; retry=false, redirect=false)
r = HTTP.request("GET", "http://169.254.169.254/metadata/instance/compute/storageProfile/imageReference?api-version=2021-02-01", ["Metadata"=>"true"]; retry=false, redirect=false)
end
tic = time()
while !istaskdone(t)
Expand Down Expand Up @@ -1653,7 +1706,7 @@ function build_envstring(env::Dict)
envstring
end

function buildstartupscript_cluster(manager::AzManager, ppi::Int, mpi_ranks_per_worker::Int, mpi_flags, nvidia_enable_ecc, nvidia_enable_mig, julia_num_threads::Int, omp_num_threads::Int, env::Dict, user::String,
function buildstartupscript_cluster(manager::AzManager, spot::Bool, ppi::Int, mpi_ranks_per_worker::Int, mpi_flags, nvidia_enable_ecc, nvidia_enable_mig, julia_num_threads::String, omp_num_threads::Int, env::Dict, user::String,
disk::AbstractString, custom_environment::Bool)
cmd, remote_julia_environment_name = buildstartupscript(manager, user, disk, custom_environment)

Expand All @@ -1665,35 +1718,46 @@ function buildstartupscript_cluster(manager::AzManager, ppi::Int, mpi_ranks_per_

juliaenvstring = remote_julia_environment_name == "" ? "" : """using Pkg; Pkg.activate(joinpath(Pkg.envdir(), "$remote_julia_environment_name")); """

# if spot is true, then ensure at least one interactive thread on workers so that one can check for spot evictions periodically.
if spot && VERSION >= v"1.9"
_julia_num_threads = split(julia_num_threads, ',')
julia_num_threads_default = length(_julia_num_threads) > 0 ? parse(Int, _julia_num_threads[1]) : 1
julia_num_threads_interactive = length(_julia_num_threads) > 1 ? parse(Int, _julia_num_threads[2]) : 0

if julia_num_threads_interactive == 0
@info "Augmenting `julia_num_threads` option with an interactive thread so it can be used on workers for spot-event polling."
julia_num_threads_interactive = 1
end
julia_num_threads = nthreads_filter("$julia_num_threads_default,$julia_num_threads_interactive")
end

if mpi_ranks_per_worker == 0
cmd *= """
sudo su - $user <<EOF
export JULIA_WORKER_TIMEOUT=$(get(ENV, "JULIA_WORKER_TIMEOUT", "720"))
export JULIA_NUM_THREADS=$julia_num_threads
export OMP_NUM_THREADS=$omp_num_threads
$envstring
julia -e '$(juliaenvstring)using AzManagers; AzManagers.nvidia_gpucheck($nvidia_enable_ecc, $nvidia_enable_mig); AzManagers.mount_datadisks(); AzManagers.azure_worker("$cookie", "$master_address", $master_port, $ppi)'
julia -t $julia_num_threads -e '$(juliaenvstring)using AzManagers; AzManagers.nvidia_gpucheck($nvidia_enable_ecc, $nvidia_enable_mig); AzManagers.mount_datadisks(); AzManagers.azure_worker("$cookie", "$master_address", $master_port, $ppi)'
EOF
"""
else
cmd *= """
sudo su - $user <<EOF
export JULIA_WORKER_TIMEOUT=$(get(ENV, "JULIA_WORKER_TIMEOUT", "720"))
export JULIA_NUM_THREADS=$julia_num_threads
export OMP_NUM_THREADS=$omp_num_threads
$envstring
julia -e '$(juliaenvstring)using AzManagers; AzManagers.nvidia_gpucheck($nvidia_enable_ecc, $nvidia_enable_mig); AzManagers.mount_datadisks()'
mpirun -n $mpi_ranks_per_worker $mpi_flags julia -e '$(juliaenvstring)using AzManagers, MPI; AzManagers.azure_worker_mpi("$cookie", "$master_address", $master_port, $ppi)'
mpirun -n $mpi_ranks_per_worker $mpi_flags julia -t $julia_num_threads -e '$(juliaenvstring)using AzManagers, MPI; AzManagers.azure_worker_mpi("$cookie", "$master_address", $master_port, $ppi)'
EOF
"""
end

cmd
end

function buildstartupscript_detached(manager::AzManager, julia_num_threads::Int, omp_num_threads::Int, env::Dict, user::String,
function buildstartupscript_detached(manager::AzManager, julia_num_threads::String, omp_num_threads::Int, env::Dict, user::String,
disk::AbstractString, custom_environment::Bool, subscriptionid, resourcegroup, vmname)
cmd, remote_julia_environment_name = buildstartupscript(manager, user, disk, custom_environment)

Expand All @@ -1706,11 +1770,10 @@ function buildstartupscript_detached(manager::AzManager, julia_num_threads::Int,
sudo su - $user <<EOF
$envstring
export JULIA_WORKER_TIMEOUT=$(get(ENV, "JULIA_WORKER_TIMEOUT", "720"))
export JULIA_NUM_THREADS=$julia_num_threads
export OMP_NUM_THREADS=$omp_num_threads
ssh-keygen -f /home/$user/.ssh/azmanagers_rsa -N '' <<<y
cd /home/$user
julia -e '$(juliaenvstring)using AzManagers; AzManagers.mount_datadisks(); AzManagers.detached_port!($(AzManagers.detached_port())); AzManagers.detachedservice(;subscriptionid="$subscriptionid", resourcegroup="$resourcegroup", vmname="$vmname")'
julia -t $julia_num_threads -e '$(juliaenvstring)using AzManagers; AzManagers.mount_datadisks(); AzManagers.detached_port!($(AzManagers.detached_port())); AzManagers.detachedservice(;subscriptionid="$subscriptionid", resourcegroup="$resourcegroup", vmname="$vmname")'
EOF
"""

Expand Down Expand Up @@ -1981,7 +2044,7 @@ function scaleset_create_or_update(manager::AzManager, user, subscriptionid, res
key = Dict("path" => "/home/$user/.ssh/authorized_keys", "keyData" => read(ssh_key, String))
push!(_template["properties"]["virtualMachineProfile"]["osProfile"]["linuxConfiguration"]["ssh"]["publicKeys"], key)

cmd = buildstartupscript_cluster(manager, ppi, mpi_ranks_per_worker, mpi_flags, nvidia_enable_ecc, nvidia_enable_mig, julia_num_threads, omp_num_threads, env, user, template["tempdisk"], custom_environment)
cmd = buildstartupscript_cluster(manager, spot, ppi, mpi_ranks_per_worker, mpi_flags, nvidia_enable_ecc, nvidia_enable_mig, julia_num_threads, omp_num_threads, env, user, template["tempdisk"], custom_environment)
_cmd = base64encode(cmd)

if length(_cmd) > 64_000
Expand Down Expand Up @@ -2090,7 +2153,7 @@ end
function mount_datadisks()
try
@info "mounting data disks"
_r = HTTP.request("GET", "http://169.254.169.254/metadata/instance?api-version=2020-06-01", ["Metadata"=>"true"]; redirect=false)
_r = HTTP.request("GET", "http://169.254.169.254/metadata/instance?api-version=2021-02-01", ["Metadata"=>"true"]; redirect=false)
r = JSON.parse(String(_r.body))
luns = String[]
for datadisks in r["compute"]["storageProfile"]["dataDisks"]
Expand Down Expand Up @@ -2465,10 +2528,13 @@ Create a VM, and returns a named tuple `(name,ip,resourcegrup,subscriptionid)` w
* `nretry=10` Max retries for re-tryable REST call failures
* `verbose=0` Verbosity flag passes to HTTP.jl methods
* `show_quota=false` after various operation, show the "x-ms-rate-remaining-resource" response header. Useful for debugging/understanding Azure quota's.
* `julia_num_threads=Threads.nthreads()` set `JULIA_NUM_THREADS` environment variable before starting the detached process
* `julia_num_threads="\$(Threads.nthreads(),\$(Threads.nthreads(:interactive))"` set the number of julia threads for the workers.[1]
* `omp_num_threads = get(ENV, "OMP_NUM_THREADS", 1)` set `OMP_NUM_THREADS` environment variable before starting the detached process
* `env=Dict()` Dictionary of environemnt variables that will be exported before starting the detached process
* `detachedservice=true` start the detached service allowing for RESTful remote code execution
# Notes
[1] Interactive threads are supported beginning in version 1.9 of Julia. For earlier versions, the default for `julia_num_threads` is `Threads.nthreads()`.
"""
function addproc(vm_template::Dict, nic_template=nothing;
name = "",
Expand All @@ -2485,7 +2551,7 @@ function addproc(vm_template::Dict, nic_template=nothing;
nretry = 10,
verbose = 0,
show_quota = false,
julia_num_threads = Threads.nthreads(),
julia_num_threads = VERSION >= v"1.9" ? "$(Threads.nthreads()),$(Threads.nthreads(:interactive))" : string(Threads.nthreads()),
omp_num_threads = parse(Int, get(ENV, "OMP_NUM_THREADS", "1")),
env = Dict(),
detachedservice = true)
Expand Down Expand Up @@ -2552,7 +2618,7 @@ function addproc(vm_template::Dict, nic_template=nothing;

local cmd
if detachedservice
cmd = buildstartupscript_detached(manager, julia_num_threads, omp_num_threads, env, user,
cmd = buildstartupscript_detached(manager, nthreads_filter(julia_num_threads), omp_num_threads, env, user,
disk, customenv, subscriptionid, resourcegroup, vmname)
else
cmd,_ = buildstartupscript(manager, user, disk, customenv)
Expand Down
Loading

0 comments on commit f753629

Please sign in to comment.