Skip to content

Commit

Permalink
clustermanager: implement worker termination
Browse files Browse the repository at this point in the history
- handle interrupt, finalize and deregister of workers
- release yarn containers and resources on worker termination
  • Loading branch information
tanmaykm committed Jan 8, 2020
1 parent ed3296f commit 58aa882
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 21 deletions.
6 changes: 4 additions & 2 deletions src/api_yarn_appmaster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ kill(yam::YarnAppMaster, diagnostics::AbstractString="") = _unregister(yam, Fina
container_allocate(yam::YarnAppMaster, numcontainers::Int; opts...) = request_alloc(yam.containers, numcontainers; opts...)
container_release(yam::YarnAppMaster, cids::ContainerIdProto...) = request_release(yam.containers, cids...)

container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cid], container_spec)
container_start(yam::YarnAppMaster, cid::ContainerIdProto, container_spec::ContainerLaunchContextProto) = container_start(yam, container_id_string(cid), container_spec)
container_start(yam::YarnAppMaster, cidstr::String, container_spec::ContainerLaunchContextProto) = container_start(yam, yam.containers.containers[cidstr], container_spec)
function container_start(yam::YarnAppMaster, container::ContainerProto, container_spec::ContainerLaunchContextProto)
@debug("starting", container)
req = StartContainerRequestProto(container_launch_context=container_spec, container_token=container.container_token)
Expand All @@ -197,7 +198,8 @@ function container_start(yam::YarnAppMaster, container::ContainerProto, containe
cid
end

container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, yam.containers.containers[cid])
container_stop(yam::YarnAppMaster, cid::ContainerIdProto) = container_stop(yam, container_id_string(cid))
container_stop(yam::YarnAppMaster, cidstr::String) = container_stop(yam, yam.containers.containers[cidstr])
function container_stop(yam::YarnAppMaster, container::ContainerProto)
@debug("stopping", container)

Expand Down
31 changes: 17 additions & 14 deletions src/api_yarn_base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ It also holds the allocation and release pipelines that are used by application
Also schedules callbacks as tasks when containers are allocated or terminated.
"""
mutable struct YarnContainers
containers::Dict{ContainerIdProto,ContainerProto}
status::Dict{ContainerIdProto,ContainerStatusProto}
active::Set{ContainerIdProto}
busy::Set{ContainerIdProto}
containers::Dict{String,ContainerProto}
status::Dict{String,ContainerStatusProto}
active::Set{String}
busy::Set{String}

alloc_pipeline::RequestPipeline{ResourceRequestProto}
release_pipeline::RequestPipeline{ContainerIdProto}
Expand All @@ -286,7 +286,7 @@ mutable struct YarnContainers
on_container_finish::Union{Nothing,Function}

function YarnContainers()
new(Dict{ContainerIdProto,ContainerProto}(), Dict{ContainerIdProto,ContainerStatusProto}(), Set{ContainerIdProto}(), Set{ContainerIdProto}(),
new(Dict{String,ContainerProto}(), Dict{String,ContainerStatusProto}(), Set{String}(), Set{String}(),
RequestPipeline{ResourceRequestProto}(), RequestPipeline{ContainerIdProto}(), 0, nothing, nothing)
end
end
Expand All @@ -313,8 +313,9 @@ function update(containers::YarnContainers, arp::AllocateResponseProto)
if isfilled(arp, :allocated_containers)
for cont in arp.allocated_containers
id = cont.id
contlist[id] = cont
push!(active, id)
idstr = container_id_string(id)
contlist[idstr] = cont
push!(active, idstr)
@debug("calling callback for alloc")
(cballoc === nothing) || @async cballoc(id)
end
Expand All @@ -323,11 +324,12 @@ function update(containers::YarnContainers, arp::AllocateResponseProto)
@debug("have completed containers")
for contst in arp.completed_container_statuses
id = contst.container_id
status[id] = contst
@debug("container finished", id, active=(id in active))
(id in active) && pop!(active, id)
(id in busy) && pop!(busy, id)
@debug("calling callback for finish")
idstr = container_id_string(id)
status[idstr] = contst
@debug("container finished", idstr, isactive=(idstr in active), isbusy=(idstr in busy))
(idstr in active) && pop!(active, idstr)
(idstr in busy) && pop!(busy, idstr)
@debug("calling callback for finish", id)
(cbfinish === nothing) || @async cbfinish(id)
end
end
Expand Down Expand Up @@ -359,15 +361,16 @@ end
function set_busy(containers::YarnContainers, cids::ContainerIdProto...)
busy = containers.busy
for cid in cids
push!(busy, cid)
push!(busy, container_id_string(cid))
end
nothing
end

function set_free(containers::YarnContainers, cids::ContainerIdProto...)
busy = containers.busy
for cid in cids
pop!(busy, cid)
cidstr = container_id_string(cid)
(cidstr in busy) && pop!(busy, cidstr)
end
nothing
end
Expand Down
17 changes: 14 additions & 3 deletions src/cluster_manager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ function container_start(manager::YarnManager, cmd::String, env::Dict{String,Str
end
end

container_stop(manager::YarnManager, cid::ContainerIdProto) = container_stop(manager.am, cid)
container_release(manager::YarnManager, cid::ContainerIdProto) = container_release(manager.am, cid)

function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Condition)
@debug("YarnManager launch", params)

Expand Down Expand Up @@ -179,10 +182,18 @@ function launch(manager::YarnManager, params::Dict, instances_arr::Array, c::Con
end

function manage(manager::YarnManager, id::Integer, config::WorkerConfig, op::Symbol)
@debug("YarnManager manage", id, op, container=config.userdata, nprocs=nprocs())
# This function needs to exist, but so far we don't do anything
if op == :deregister
@debug("YarnManager manage", id, op, nprocs=nprocs())
!manager.keep_connected && (1 == nprocs()) && (@async disconnect(manager))
if op in [:deregister, :finalize, :interrupt]
if (config.userdata !== nothing) && haskey(config.userdata, :container_id)
cid = config.userdata[:container_id]
cidstr = container_id_string(cid)
if cidstr in manager.am.containers.active
container_stop(manager, cid)
container_release(manager, cid)
end
end
!manager.keep_connected && (1 == nprocs()) && disconnect(manager)
end
nothing
end
4 changes: 2 additions & 2 deletions test/yarntests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ function make_julia_env()
end
end
if !("JULIA_LOAD_PATH" in keys(env))
home = ENV["HOME"]
env["JULIA_LOAD_PATH"] = join(Base.LOAD_PATH, ':') * ":$(home)/.julia/dev:$(home)/.julia/packages"
env["JULIA_LOAD_PATH"] = join([Base.LOAD_PATH..., joinpath(Base.homedir(), ".julia", "dev"), joinpath(Base.homedir(), ".julia", "packages")], ':')
end
if !("JULIA_DEPOT_PATH" in keys(env))
env["JULIA_DEPOT_PATH"] = join(Base.DEPOT_PATH, ':')
Expand All @@ -84,6 +83,7 @@ function test_yarn_clustermanager(yarncm::YarnManager, limitedtestenv::Bool)
@everywhere println("hi")
rmprocs(workers())
@test nprocs() == 1
@test yarncm.am.registration !== nothing # because keep_connected is true by default
nothing
end

Expand Down

0 comments on commit 58aa882

Please sign in to comment.